728x90

Golang으로 대량 API 요청 처리: Excel 파일에서 데이터 읽기부터 병렬 처리까지

 

안녕하세요, 오늘은 Go 언어를 사용하여 Excel 파일에서 데이터를 읽어와 대량의 API 요청을 처리하는 방법에 대해 알아보겠습니다. 이 프로그램은 입력할 Excel 파일에서 데이터를 읽어와 body에 넣어서 API에 요청을 보내고, 응답을 파일에 저장하는 과정을 수행합니다. 

1. 과정

Excel 파일에서 데이터 추출하기 (API request body에 넣을 값들)

API 요청 (병렬 처리를 위한 고루틴 사용 - WaitGroup)

응답 데이터 정렬 by index

결과를 파일로 저장 (csv)

로깅 (응답 데이터, 응답 시간 로그 파일 생성)


2. Excel 파일에서 원하는 데이터 추출하기

먼저, Excel 파일에서 데이터를 읽어오는 코드를 만듭니다.

package main

import (
	"encoding/csv"
	"fmt"
	"io/ioutil"
	"log"
	"os"
	"strings"
	"time"

	"github.com/tealeg/xlsx"
)

const (
	FILE_PATH      = "./sample.xlsx"
	SHEET_INDEX    = 1
	COLUMN         = 'F'
	START_ROW      = 2
	END_ROW        = 186
	PREVIEW_LENGTH = 500
)

func currentDateString() string {
	return time.Now().Format("20060102")
}

func readExcelColumn(filePath string, column rune, startRow, endRow int) ([]string, error) {
	xlFile, err := xlsx.OpenFile(filePath)
	if err != nil {
		return nil, fmt.Errorf("error: file '%s' not found", filePath)
	}

	if SHEET_INDEX >= len(xlFile.Sheets) {
		return nil, fmt.Errorf("error: sheet index out of range")
	}

	sheet := xlFile.Sheets[SHEET_INDEX]
	columnIndex := int(column - 'A')
	var columnData []string

	for i := startRow - 1; i < endRow; i++ {
		if i >= len(sheet.Rows) {
			break
		}
		cell := sheet.Rows[i].Cells[columnIndex]
		columnData = append(columnData, cell.String())
	}

	return columnData, nil
}

func processCellData(cellData string) string {
	parts := strings.Split(cellData, "\n")
	if len(parts) > 1 {
		return strings.TrimSpace(parts[1])
	}
	return strings.TrimSpace(parts[0])
}

func processColumnData(columnData []string) []string {
	var processedData []string
	for _, cell := range columnData {
		processedData = append(processedData, processCellData(cell))
	}
	return processedData
}

func saveToCSV(data []string, outputFile string) error {
	file, err := os.Create(outputFile)
	if err != nil {
		return fmt.Errorf("error writing to file: %v", err)
	}
	defer file.Close()

	writer := csv.NewWriter(file)
	defer writer.Flush()

	for _, item := range data {
		if err := writer.Write([]string{item}); err != nil {
			return fmt.Errorf("error writing to file: %v", err)
		}
	}

	fmt.Printf("Data has been saved to %s\n", outputFile)
	return nil
}

func previewFile(filePath string, length int) error {
	data, err := ioutil.ReadFile(filePath)
	if err != nil {
		return fmt.Errorf("error reading file for preview: %v", err)
	}

	fmt.Println("Preview of saved data:")
	fmt.Println(string(data)[:length])
	return nil
}

func main() {
	currentDate := currentDateString()
	outputFile := fmt.Sprintf("./body_%s.csv", currentDate)

	columnData, err := readExcelColumn(FILE_PATH, COLUMN, START_ROW, END_ROW)
	if err != nil {
		log.Fatalf("Failed to read Excel column: %v", err)
	}

	if len(columnData) == 0 {
		log.Println("No data to process. Check the Excel file and column specification.")
		return
	}

	processedData := processColumnData(columnData)

	if err := saveToCSV(processedData, outputFile); err != nil {
		log.Fatalf("Failed to save to CSV: %v", err)
	}

	if err := previewFile(outputFile, PREVIEW_LENGTH); err != nil {
		log.Fatalf("Failed to preview file: %v", err)
	}
}

 

3. 읽어온 데이터를 바탕으로 다수의 API 요청 보내기

WaitGroup을 사용해서 동시성을 활용해 API를 병렬적으로 실행합니다.

동시 작업 추적

WaitGroup은 완료되어야 할 고루틴(goroutine)의 수를 추적합니다. 이는 여러 고루틴이 병렬로 실행될 때 유용합니다.

작업 완료 대기

Main Go routine 이 모든 작업이 완료될 때까지 기다릴 수 있게 해줍니다. 이를 통해 모든 병렬 작업이 끝날 때까지 프로그램이 종료되지 않도록 합니다.

카운터 메커니즘

WaitGroup은 내부적으로 카운터를 사용합니다. 이 카운터는 다음 메서드들로 조작됩니다:

- Add(delta int): 대기해야 할 고루틴의 수를 증가시킵니다.
- Done(): 고루틴 하나가 완료되었음을 알립니다 (내부적으로 Add(-1)과 동일).
- Wait(): 카운터가 0이 될 때까지 블록합니다.

그리고 1번 요청을 먼저 했다고 해도 30번째 요청의 응답이 먼저 들어올 수 있으므로 index를 지정해서 모든 요청이 완료된 순간, index를 기준으로 오름차순 정렬을 하게 했습니다.

이로써, 동시성 프로그래밍을 활용하여 30개의 API 요청 시 응답시간을 270s -> 9s로 (1개 요청 시 보통 9s) 시간을 단축하였습니다.

package main

import (
	"bytes"
	"encoding/csv"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"net/http"
	"os"
	"sort"
	"strconv"
	"sync"
	"time"

	"go.uber.org/zap"

	"go.uber.org/zap/zapcore"
)

// 상수 정의
const (
	INPUT_FILE        = "body_20240714.csv"          // 입력 CSV 파일 이름
	API_URL           = "http://localhost:8000/api/v1/xxxx"   // API URL
	ASSISTANT_CONTENT = "" // 
	DATE_FORMAT       = "20240712"                            // 날짜 형식
	OUTPUT_CSV_FILE   = "response_20240712.csv"          // 출력 CSV 파일 이름
)

// CSV 파일 읽기 함수
func readCSV(filename string, logger *zap.Logger) ([]string, error) {

	logger.Info("Reading CSV file", zap.String("filename", filename)) // CSV 파일 읽기 시작 로그
	file, err := os.Open(filename)
	if err != nil {
		return nil, fmt.Errorf("Error opening file: %w", err) // 파일 열기 오류 시 에러 반환
	}
	defer file.Close()

	reader := csv.NewReader(file)
	var data []string
	for {
		record, err := reader.Read()
		if err != nil {
			break // 더 이상 읽을 레코드가 없으면 종료
		}
		if len(record) > 0 {
			data = append(data, record[0]) // CSV 레코드의 첫 번째 필드만 저장
		}
	}
	logger.Info("Successfully read records from CSV file", zap.Int("records", len(data))) // 성공적으로 읽은 레코드 수 로그
	return data, nil
}

// API 요청 함수
func makeAPIRequest(userContent string, wg *sync.WaitGroup, results chan<- [2]string, index int, logger *zap.Logger) {
	defer wg.Done() // 작업이 완료되면 WaitGroup의 작업 카운터를 줄임

	logger.Info("Making API request", zap.Int("index", index), zap.String("userContent", userContent)) // API 요청 시작 로그
	startTime := time.Now()                                                                            // 요청 시작 시간 기록

	requestData := []map[string]string{
		// 원하는 데이터 요청 형식
	}
	jsonData, err := json.Marshal(requestData)
	if err != nil {
		results <- [2]string{fmt.Sprintf("Error marshalling JSON: %v", err), fmt.Sprintf("%d", index)}
		logger.Error("Error marshalling JSON", zap.Int("index", index), zap.Error(err)) // JSON 변환 오류 로그
		return
	}

	req, err := http.NewRequest("POST", API_URL, bytes.NewBuffer(jsonData))
	if err != nil {
		results <- [2]string{fmt.Sprintf("Error creating request: %v", err), fmt.Sprintf("%d", index)}
		logger.Error("Error creating request", zap.Int("index", index), zap.Error(err)) // HTTP 요청 생성 오류 로그
		return
	}
	req.Header.Set("Content-Type", "application/json") // 요청 헤더 설정

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		results <- [2]string{fmt.Sprintf("Error making API request: %v", err), fmt.Sprintf("%d", index)}
		logger.Error("Error making API request", zap.Int("index", index), zap.Error(err)) // API 요청 오류 로그
		return
	}
	defer resp.Body.Close()

	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		results <- [2]string{fmt.Sprintf("Error reading response: %v", err), fmt.Sprintf("%d", index)}
		logger.Error("Error reading response", zap.Int("index", index), zap.Error(err)) // 응답 읽기 오류 로그
		return
	}

	var response map[string]interface{}
	if err := json.Unmarshal(body, &response); err != nil {
		results <- [2]string{fmt.Sprintf("Invalid response format: %s", string(body)), fmt.Sprintf("%d", index)}
		logger.Error("Invalid response format", zap.Int("index", index), zap.String("body", string(body))) // 응답 형식 오류 로그
		return
	}

	if output, ok := response["output"].(string); ok {
		results <- [2]string{output, fmt.Sprintf("%d", index)}
		duration := time.Since(startTime)                                                                                                   // 요청 소요 시간 계산
		logger.Info("Received valid response", zap.Int("index", index), zap.String("response", output), zap.Duration("duration", duration)) // 유효한 응답 로그 및 소요 시간 기록
	} else {
		results <- [2]string{"No output field in response", fmt.Sprintf("%d", index)}
		logger.Warn("No output field in response", zap.Int("index", index)) // 응답에 output 필드가 없음 로그
	}
}

// CSV 파일에 데이터 저장 함수
func saveToCSV(filename string, data [][2]string, logger *zap.Logger) error {
	logger.Info("Saving data to CSV file", zap.String("filename", filename)) // CSV 파일 저장 시작 로그
	file, err := os.Create(filename)
	if err != nil {
		return fmt.Errorf("Error creating CSV file: %w", err) // 파일 생성 오류 시 에러 반환
	}
	defer file.Close()

	writer := csv.NewWriter(file)
	defer writer.Flush()

	for _, value := range data {
		if err := writer.Write([]string{value[0]}); err != nil {
			return fmt.Errorf("Error writing record to CSV file: %w", err) // 레코드 작성 오류 시 에러 반환
		}
	}

	logger.Info("Successfully saved data to CSV file") // 데이터 저장 성공 로그
	return nil
}

func fileLogger(filename string) *zap.Logger {
	config := zap.NewProductionEncoderConfig()
	config.EncodeLevel = zapcore.CapitalColorLevelEncoder
	config.EncodeTime = zapcore.ISO8601TimeEncoder
	fileEncoder := zapcore.NewJSONEncoder(config)
	consoleEncoder := zapcore.NewConsoleEncoder(config)
	logFile, _ := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
	writer := zapcore.AddSync(logFile)
	defaultLogLevel := zapcore.DebugLevel
	core := zapcore.NewTee(
		zapcore.NewCore(fileEncoder, writer, defaultLogLevel),
		zapcore.NewCore(consoleEncoder, zapcore.AddSync(os.Stdout), defaultLogLevel),
	)

	logger := zap.New(core, zap.AddCaller(), zap.AddStacktrace(zapcore.ErrorLevel))

	return logger
}

func main() {
	filename := "logs.log"
	logger := fileLogger(filename)

	defer logger.Sync()

	var startIndex, endIndex int
	var err error

	// 터미널에서 시작 인덱스와 끝 인덱스를 입력받음
	fmt.Print("startIndex(0~184): ") // 0 -> 2
	_, err = fmt.Scan(&startIndex)
	if err != nil {
		logger.Fatal("Error reading start index", zap.Error(err)) // 시작 인덱스 읽기 오류 시 프로그램 종료
	}

	fmt.Print("endIndex(1~185): ")
	_, err = fmt.Scan(&endIndex)
	if err != nil {
		logger.Fatal("Error reading end index", zap.Error(err)) // 끝 인덱스 읽기 오류 시 프로그램 종료
	}

	// CSV 파일에서 질문 읽기
	questions, err := readCSV(INPUT_FILE, logger)
	if err != nil {
		logger.Fatal("Error reading CSV file", zap.Error(err)) // CSV 파일 읽기 오류 시 프로그램 종료
	}

	// 끝 인덱스가 0이거나 질문 수보다 큰 경우 질문 수로 설정
	if endIndex == 0 || endIndex > len(questions) {
		endIndex = len(questions)
	}

	var wg sync.WaitGroup
	results := make(chan [2]string, endIndex-startIndex)

	// 시작 인덱스부터 끝 인덱스까지 API 요청 생성
	for i := startIndex; i < endIndex; i++ {
		wg.Add(1)                                                // WaitGroup 카운터 증가
		go makeAPIRequest(questions[i], &wg, results, i, logger) // 고루틴으로 API 요청 실행
	}

	wg.Wait()      // 모든 고루틴이 완료될 때까지 대기
	close(results) // 채널 닫기

	var responseData [][2]string
	for response := range results {
		responseData = append(responseData, response) // 응답 데이터 수집
	}

	// 응답 데이터를 index 기준으로 오름차순 정렬
	sort.Slice(responseData, func(i, j int) bool {
		indexI, _ := strconv.Atoi(responseData[i][1])
		indexJ, _ := strconv.Atoi(responseData[j][1])
		return indexI < indexJ
	})

	// 정렬된 데이터의 텍스트 내용만 CSV 파일에 저장
	var sortedTextData [][2]string
	for _, value := range responseData {
		sortedTextData = append(sortedTextData, [2]string{value[0]})
	}

	// 응답 데이터를 CSV 파일에 저장
	if err := saveToCSV(OUTPUT_CSV_FILE, sortedTextData, logger); err != nil {
		logger.Fatal("Error saving to CSV", zap.Error(err)) // CSV 저장 오류 시 프로그램 종료
	}

	logger.Info("Processing complete", zap.String("outputFile", OUTPUT_CSV_FILE)) // 처리 완료 로그
	fmt.Printf("Processing complete. Results saved to %s\n", OUTPUT_CSV_FILE)     // 처리 완료 메시지 출력
}

 

읽어주셔서 감사합니다 🙇‍♂️

728x90

Golang으로 백엔드 개발 도전기

Go 언어

 

1. Intro

간단한 백엔드를 만들일이 있어서 고민을 하다가, 유튜브 알고리즘에 뜬 아래 두 동영상을 보고 Golang Backend를 공부해야겠다고 다짐했습니다. 약 4년전, 공군 개발병 복무 시절, Golang + go gin 으로 간단한 REST API를 사지방에서 만들어보고 초당 몇번의 요청이 가능한지 테스트해보고 빨라서 놀랐던 기억이 있었는데, 그 시절이 떠오르네요.
[Golang 도입, 그리고 4년 간의 기록 / Golang과 함께 서버 레이턴시를 500배 개선한 후기] 
https://www.youtube.com/watch?v=75X_eBW0mog

https://www.youtube.com/watch?v=NVqVS64qClk

2. 왜 Golang인가?

간단한 백엔드 개발을 위한 언어를 선택하는 과정에서 가장 중요하게 생각한 것은 성능이 빠른지와 단순한지였습니다. Golang은 다음과 같은 매력적인 장점들이 있습니다.

빠른 컴파일 속도
Golang의 빠른 컴파일 속도는 개발 생산성을 크게 향상시킵니다. 공군 시절 Spring 프로젝트를 수정하고 컴파일 하는데 1분 가까이 걸려서 멍때렸던 기억이 있었는데, 아직 경험해보지 못했지만 go는 프로젝트가 커져도 빠른 컴파일이 된다고 합니다.
뛰어난 동시성 처리
Go routine과 채널을 통한 동시성 제어는 Golang의 큰 장점 중 하나입니다. 요즘 Sass(Software as a service)가 많아지면서 Third party app(GPT etc..)과 API 연동할 일이 많은데 이때 강점을 가집니다.
언어의 단순성
간결하고 읽기 쉬운 문법으로 빠르게 학습할 수 있습니다. 사실 다른 빠른 언어들도 있지만, 개인적으로 go 언어가 이해가 잘되었습니다.
성능 최적화
메모리와 CPU 사용을 쉽게 최적화할 수 있다고 합니다. 고성능 애플리케이션 개발에 많이 사용된다고 합니다.

물론, Golang에도 몇 가지 단점이 있습니다..

에러 처리의 복잡성
모든 에러를 명시적으로 처리해야 해서 코드가 길어질 수 있습니다.
GC 관련 이슈
대규모 시스템에서는 가비지 컬렉션으로 인한 성능 저하가 발생할 수 있습니다.
제네릭 지원 부족
제네릭 기능의 부재로 인해 코드 재사용성이 떨어질 수 있습니다.
특정 패턴 강제
에러 처리 등에서 특정 패턴을 따라야 하는 경우가 많습니다.

3. 선택한 기술 스택

Golang 커뮤니티를 탐색하면서 많은 개발자들이 추천하거나 github star가 많은 라이브러리를 찾았습니다. 제가 선택한 기술 스택은 다음과 같습니다:

Go Gin
경량화되고 빠른 웹 프레임워크로, RESTful API 개발에 적합합니다.
SQLC
SQL 쿼리를 Go 코드로 변환해주는 도구로, 타입 안정성과 성능을 모두 잡을 수 있습니다.
PostgreSQL with RDS
요즘 많이 쓰는 관계형 데이터베이스입니다.
Docker
개발 환경의 일관성을 유지할 수 있습니다.
Github Action CI/CD
easy..!

4. 특이했던 점

NestJS에서 사용했었던 schema.prisma처럼 migration 폴더에 데이터베이스 정보(PK, FK, Constraint, relation, type 등)를 기술하는 것. @Controller, @Service 같은 어노테이션이 없는것. pointer star (*), & 등 C 같은 느낌. 특이한 for문 등. 내일은 AWS RDS 데이터베이스 만들고, 연결해봐야겠다. 그리고 메모장같은 간단한 CRUD를 만들어보고 GPT나 Claude API 연동 시도해봐야겠다..! 

 
읽어주셔서 감사합니다 😊
 
 

+ Recent posts