Golang으로 대량 API 요청 처리: Excel 파일에서 데이터 읽기부터 병렬 처리까지
안녕하세요, 오늘은 Go 언어를 사용하여 Excel 파일에서 데이터를 읽어와 대량의 API 요청을 처리하는 방법에 대해 알아보겠습니다. 이 프로그램은 입력할 Excel 파일에서 데이터를 읽어와 body에 넣어서 API에 요청을 보내고, 응답을 파일에 저장하는 과정을 수행합니다.
1. 과정
Excel 파일에서 데이터 추출하기 (API request body에 넣을 값들)
API 요청 (병렬 처리를 위한 고루틴 사용 - WaitGroup)
응답 데이터 정렬 by index
결과를 파일로 저장 (csv)
로깅 (응답 데이터, 응답 시간 로그 파일 생성)
2. Excel 파일에서 원하는 데이터 추출하기
먼저, Excel 파일에서 데이터를 읽어오는 코드를 만듭니다.
package main
import (
"encoding/csv"
"fmt"
"io/ioutil"
"log"
"os"
"strings"
"time"
"github.com/tealeg/xlsx"
)
const (
FILE_PATH = "./sample.xlsx"
SHEET_INDEX = 1
COLUMN = 'F'
START_ROW = 2
END_ROW = 186
PREVIEW_LENGTH = 500
)
func currentDateString() string {
return time.Now().Format("20060102")
}
func readExcelColumn(filePath string, column rune, startRow, endRow int) ([]string, error) {
xlFile, err := xlsx.OpenFile(filePath)
if err != nil {
return nil, fmt.Errorf("error: file '%s' not found", filePath)
}
if SHEET_INDEX >= len(xlFile.Sheets) {
return nil, fmt.Errorf("error: sheet index out of range")
}
sheet := xlFile.Sheets[SHEET_INDEX]
columnIndex := int(column - 'A')
var columnData []string
for i := startRow - 1; i < endRow; i++ {
if i >= len(sheet.Rows) {
break
}
cell := sheet.Rows[i].Cells[columnIndex]
columnData = append(columnData, cell.String())
}
return columnData, nil
}
func processCellData(cellData string) string {
parts := strings.Split(cellData, "\n")
if len(parts) > 1 {
return strings.TrimSpace(parts[1])
}
return strings.TrimSpace(parts[0])
}
func processColumnData(columnData []string) []string {
var processedData []string
for _, cell := range columnData {
processedData = append(processedData, processCellData(cell))
}
return processedData
}
func saveToCSV(data []string, outputFile string) error {
file, err := os.Create(outputFile)
if err != nil {
return fmt.Errorf("error writing to file: %v", err)
}
defer file.Close()
writer := csv.NewWriter(file)
defer writer.Flush()
for _, item := range data {
if err := writer.Write([]string{item}); err != nil {
return fmt.Errorf("error writing to file: %v", err)
}
}
fmt.Printf("Data has been saved to %s\n", outputFile)
return nil
}
func previewFile(filePath string, length int) error {
data, err := ioutil.ReadFile(filePath)
if err != nil {
return fmt.Errorf("error reading file for preview: %v", err)
}
fmt.Println("Preview of saved data:")
fmt.Println(string(data)[:length])
return nil
}
func main() {
currentDate := currentDateString()
outputFile := fmt.Sprintf("./body_%s.csv", currentDate)
columnData, err := readExcelColumn(FILE_PATH, COLUMN, START_ROW, END_ROW)
if err != nil {
log.Fatalf("Failed to read Excel column: %v", err)
}
if len(columnData) == 0 {
log.Println("No data to process. Check the Excel file and column specification.")
return
}
processedData := processColumnData(columnData)
if err := saveToCSV(processedData, outputFile); err != nil {
log.Fatalf("Failed to save to CSV: %v", err)
}
if err := previewFile(outputFile, PREVIEW_LENGTH); err != nil {
log.Fatalf("Failed to preview file: %v", err)
}
}
3. 읽어온 데이터를 바탕으로 다수의 API 요청 보내기
WaitGroup을 사용해서 동시성을 활용해 API를 병렬적으로 실행합니다.
동시 작업 추적
WaitGroup은 완료되어야 할 고루틴(goroutine)의 수를 추적합니다. 이는 여러 고루틴이 병렬로 실행될 때 유용합니다.
작업 완료 대기
Main Go routine 이 모든 작업이 완료될 때까지 기다릴 수 있게 해줍니다. 이를 통해 모든 병렬 작업이 끝날 때까지 프로그램이 종료되지 않도록 합니다.
카운터 메커니즘
WaitGroup은 내부적으로 카운터를 사용합니다. 이 카운터는 다음 메서드들로 조작됩니다:
- Add(delta int): 대기해야 할 고루틴의 수를 증가시킵니다.
- Done(): 고루틴 하나가 완료되었음을 알립니다 (내부적으로 Add(-1)과 동일).
- Wait(): 카운터가 0이 될 때까지 블록합니다.
그리고 1번 요청을 먼저 했다고 해도 30번째 요청의 응답이 먼저 들어올 수 있으므로 index를 지정해서 모든 요청이 완료된 순간, index를 기준으로 오름차순 정렬을 하게 했습니다.
이로써, 동시성 프로그래밍을 활용하여 30개의 API 요청 시 응답시간을 270s -> 9s로 (1개 요청 시 보통 9s) 시간을 단축하였습니다.
package main
import (
"bytes"
"encoding/csv"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"sort"
"strconv"
"sync"
"time"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// 상수 정의
const (
INPUT_FILE = "body_20240714.csv" // 입력 CSV 파일 이름
API_URL = "http://localhost:8000/api/v1/xxxx" // API URL
ASSISTANT_CONTENT = "" //
DATE_FORMAT = "20240712" // 날짜 형식
OUTPUT_CSV_FILE = "response_20240712.csv" // 출력 CSV 파일 이름
)
// CSV 파일 읽기 함수
func readCSV(filename string, logger *zap.Logger) ([]string, error) {
logger.Info("Reading CSV file", zap.String("filename", filename)) // CSV 파일 읽기 시작 로그
file, err := os.Open(filename)
if err != nil {
return nil, fmt.Errorf("Error opening file: %w", err) // 파일 열기 오류 시 에러 반환
}
defer file.Close()
reader := csv.NewReader(file)
var data []string
for {
record, err := reader.Read()
if err != nil {
break // 더 이상 읽을 레코드가 없으면 종료
}
if len(record) > 0 {
data = append(data, record[0]) // CSV 레코드의 첫 번째 필드만 저장
}
}
logger.Info("Successfully read records from CSV file", zap.Int("records", len(data))) // 성공적으로 읽은 레코드 수 로그
return data, nil
}
// API 요청 함수
func makeAPIRequest(userContent string, wg *sync.WaitGroup, results chan<- [2]string, index int, logger *zap.Logger) {
defer wg.Done() // 작업이 완료되면 WaitGroup의 작업 카운터를 줄임
logger.Info("Making API request", zap.Int("index", index), zap.String("userContent", userContent)) // API 요청 시작 로그
startTime := time.Now() // 요청 시작 시간 기록
requestData := []map[string]string{
// 원하는 데이터 요청 형식
}
jsonData, err := json.Marshal(requestData)
if err != nil {
results <- [2]string{fmt.Sprintf("Error marshalling JSON: %v", err), fmt.Sprintf("%d", index)}
logger.Error("Error marshalling JSON", zap.Int("index", index), zap.Error(err)) // JSON 변환 오류 로그
return
}
req, err := http.NewRequest("POST", API_URL, bytes.NewBuffer(jsonData))
if err != nil {
results <- [2]string{fmt.Sprintf("Error creating request: %v", err), fmt.Sprintf("%d", index)}
logger.Error("Error creating request", zap.Int("index", index), zap.Error(err)) // HTTP 요청 생성 오류 로그
return
}
req.Header.Set("Content-Type", "application/json") // 요청 헤더 설정
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
results <- [2]string{fmt.Sprintf("Error making API request: %v", err), fmt.Sprintf("%d", index)}
logger.Error("Error making API request", zap.Int("index", index), zap.Error(err)) // API 요청 오류 로그
return
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
results <- [2]string{fmt.Sprintf("Error reading response: %v", err), fmt.Sprintf("%d", index)}
logger.Error("Error reading response", zap.Int("index", index), zap.Error(err)) // 응답 읽기 오류 로그
return
}
var response map[string]interface{}
if err := json.Unmarshal(body, &response); err != nil {
results <- [2]string{fmt.Sprintf("Invalid response format: %s", string(body)), fmt.Sprintf("%d", index)}
logger.Error("Invalid response format", zap.Int("index", index), zap.String("body", string(body))) // 응답 형식 오류 로그
return
}
if output, ok := response["output"].(string); ok {
results <- [2]string{output, fmt.Sprintf("%d", index)}
duration := time.Since(startTime) // 요청 소요 시간 계산
logger.Info("Received valid response", zap.Int("index", index), zap.String("response", output), zap.Duration("duration", duration)) // 유효한 응답 로그 및 소요 시간 기록
} else {
results <- [2]string{"No output field in response", fmt.Sprintf("%d", index)}
logger.Warn("No output field in response", zap.Int("index", index)) // 응답에 output 필드가 없음 로그
}
}
// CSV 파일에 데이터 저장 함수
func saveToCSV(filename string, data [][2]string, logger *zap.Logger) error {
logger.Info("Saving data to CSV file", zap.String("filename", filename)) // CSV 파일 저장 시작 로그
file, err := os.Create(filename)
if err != nil {
return fmt.Errorf("Error creating CSV file: %w", err) // 파일 생성 오류 시 에러 반환
}
defer file.Close()
writer := csv.NewWriter(file)
defer writer.Flush()
for _, value := range data {
if err := writer.Write([]string{value[0]}); err != nil {
return fmt.Errorf("Error writing record to CSV file: %w", err) // 레코드 작성 오류 시 에러 반환
}
}
logger.Info("Successfully saved data to CSV file") // 데이터 저장 성공 로그
return nil
}
func fileLogger(filename string) *zap.Logger {
config := zap.NewProductionEncoderConfig()
config.EncodeLevel = zapcore.CapitalColorLevelEncoder
config.EncodeTime = zapcore.ISO8601TimeEncoder
fileEncoder := zapcore.NewJSONEncoder(config)
consoleEncoder := zapcore.NewConsoleEncoder(config)
logFile, _ := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
writer := zapcore.AddSync(logFile)
defaultLogLevel := zapcore.DebugLevel
core := zapcore.NewTee(
zapcore.NewCore(fileEncoder, writer, defaultLogLevel),
zapcore.NewCore(consoleEncoder, zapcore.AddSync(os.Stdout), defaultLogLevel),
)
logger := zap.New(core, zap.AddCaller(), zap.AddStacktrace(zapcore.ErrorLevel))
return logger
}
func main() {
filename := "logs.log"
logger := fileLogger(filename)
defer logger.Sync()
var startIndex, endIndex int
var err error
// 터미널에서 시작 인덱스와 끝 인덱스를 입력받음
fmt.Print("startIndex(0~184): ") // 0 -> 2
_, err = fmt.Scan(&startIndex)
if err != nil {
logger.Fatal("Error reading start index", zap.Error(err)) // 시작 인덱스 읽기 오류 시 프로그램 종료
}
fmt.Print("endIndex(1~185): ")
_, err = fmt.Scan(&endIndex)
if err != nil {
logger.Fatal("Error reading end index", zap.Error(err)) // 끝 인덱스 읽기 오류 시 프로그램 종료
}
// CSV 파일에서 질문 읽기
questions, err := readCSV(INPUT_FILE, logger)
if err != nil {
logger.Fatal("Error reading CSV file", zap.Error(err)) // CSV 파일 읽기 오류 시 프로그램 종료
}
// 끝 인덱스가 0이거나 질문 수보다 큰 경우 질문 수로 설정
if endIndex == 0 || endIndex > len(questions) {
endIndex = len(questions)
}
var wg sync.WaitGroup
results := make(chan [2]string, endIndex-startIndex)
// 시작 인덱스부터 끝 인덱스까지 API 요청 생성
for i := startIndex; i < endIndex; i++ {
wg.Add(1) // WaitGroup 카운터 증가
go makeAPIRequest(questions[i], &wg, results, i, logger) // 고루틴으로 API 요청 실행
}
wg.Wait() // 모든 고루틴이 완료될 때까지 대기
close(results) // 채널 닫기
var responseData [][2]string
for response := range results {
responseData = append(responseData, response) // 응답 데이터 수집
}
// 응답 데이터를 index 기준으로 오름차순 정렬
sort.Slice(responseData, func(i, j int) bool {
indexI, _ := strconv.Atoi(responseData[i][1])
indexJ, _ := strconv.Atoi(responseData[j][1])
return indexI < indexJ
})
// 정렬된 데이터의 텍스트 내용만 CSV 파일에 저장
var sortedTextData [][2]string
for _, value := range responseData {
sortedTextData = append(sortedTextData, [2]string{value[0]})
}
// 응답 데이터를 CSV 파일에 저장
if err := saveToCSV(OUTPUT_CSV_FILE, sortedTextData, logger); err != nil {
logger.Fatal("Error saving to CSV", zap.Error(err)) // CSV 저장 오류 시 프로그램 종료
}
logger.Info("Processing complete", zap.String("outputFile", OUTPUT_CSV_FILE)) // 처리 완료 로그
fmt.Printf("Processing complete. Results saved to %s\n", OUTPUT_CSV_FILE) // 처리 완료 메시지 출력
}
읽어주셔서 감사합니다 🙇♂️
'개발 > Golang' 카테고리의 다른 글
sqlc 사용기: Go + PostgreSQL, SQL 기능을 하는 함수 생성 (2) | 2024.07.19 |
---|---|
Golang으로 백엔드 개발 도전기 (0) | 2024.07.12 |