이전 구축되어있던 로깅 파이프라인은 FluentD를 통해 Opensearch에 로그를 쌓고 있었고 개인정보관련 로그는 인덱스를 날짜 별로 분리하여 따로 관리하고 있었다. 개인정보관련해서 최소 5년 보관해야하는 규약이 있었고 시간이 지남에 따라 용량이 쌓여 비용이 증가하였고 인덱스가 생성되지 않는 문제가 발생했다.
가장 좋은 선택지는 UltraWarm 노드와 Cold Storage를 활성화해서 인덱스를 다른 계층에 저장하는 것이 비용과 관리에 효율적이라고 생각한다. 하지만 이미 구축되어 제공되던 로깅 서비스에 문제를 해결하기 위해 S3로 오래된 날짜 별 인덱스의 적재된 로그를 이관하는 스크립트를 기록한다.
신규 구축 과정에서 해당 문제를 겪고 opensearch의 datastream을 활용하여 로깅을 구성하고 각 계층(Hot, Warm, Cold)을 어떻게 활용하였고, 인덱스를 Rollover했던 과정은 다음 포스팅에 기록하겠다.
하기 스크립트는 특정 연도-월(year-month)에 해당하는 날짜별 데이터를 Elasticsearch에서 조회하여 CSV 파일로 저장하고, 모든 CSV 파일을 압축한 후 AWS S3 버킷에 업로드하는 작업을 자동화한 것이다. ES 쿼리 부분은 상황에 맞게 다시 설계하고 각 스텝 . 주석을 확인하고 밑에 설명을 참고하면 될 듯 하다.
#!/bin/bash
# 전달된 인자 확인
if [ "$#" -ne 1 ]; then
echo "Usage: $0 {year-month}"
exit 1
fi
YEAR_MONTH=$1
START_DATE="${YEAR_MONTH}-01"
END_DATE=$(date -d "$START_DATE +1 month -1 day" +%Y-%m-%d)
S3_BUCKET_NAME="S3 buket name"
# 기본 Elasticsearch 쿼리 템플릿 / 각 상황(인덱스)에 맞도록 쿼리 설계
ES_QUERY_TEMPLATE='{
"_source": ["@timestamp", "project", "loginId", "client_ip", "referer", "message"],
"query": {
"bool": {
"must": [
{
"bool": {
"should": [
{"wildcard": {"referer": "*domain*"}},
{"wildcard": {"referer": "*domain*"}}
]
}
},
{"wildcard": {"referer": "*main*"}},
{
"bool": {
"should": [
{"term": {"project": "projectname"}},
{"term": {"project": "projectname"}}
]
}
}
],
"must_not": [
{"wildcard": {"referer": "*login*"}},
{"wildcard": {"referer": "*popup*"}},
{"wildcard": {"message": "*update*"}},
{"wildcard": {"message": "*insert*"}},
{"wildcard": {"message": "*COUNT*"}}
]
}
},
"sort": [
{"@timestamp": {"order": "asc"}}
],
"track_total_hits": true,
"size": 1000,
"from": FROM_PLACEHOLDER
}'
# 날짜별로 데이터 가져오기
CURRENT_DATE=$START_DATE
while [ "$CURRENT_DATE" != "$END_DATE" ]; do
# 날짜 형식 변환
FORMATTED_DATE=$(date -d "$CURRENT_DATE" +%Y%m%d)
CSV_FILE="prod.privacy-${CURRENT_DATE}.csv"
FROM=0
BATCH_SIZE=1000
TOTAL_HITS=1
# Elasticsearch URL 설정
ES_URL="es 주소"
# CSV 헤더 작성
echo '"@timestamp","project","loginId","client_ip","referer","message"' > $CSV_FILE
while [ $FROM -lt $TOTAL_HITS ]; do
# Elasticsearch 쿼리 생성
ES_QUERY=$(echo $ES_QUERY_TEMPLATE | sed "s/FROM_PLACEHOLDER/$FROM/")
# Elasticsearch에서 데이터 조회
RESPONSE=$(curl -s -u admin:password -X POST -H 'Content-Type: application/json' -d "$ES_QUERY" "$ES_URL")
# 전체 히트 수 가져오기 (최초 한 번만)
if [ $FROM -eq 0 ]; then
TOTAL_HITS=$(echo $RESPONSE | jq '.hits.total.value')
echo "Processing $CURRENT_DATE: Total hits: $TOTAL_HITS"
fi
# 데이터가 존재하는 경우 처리
HITS_COUNT=$(echo $RESPONSE | jq '.hits.hits | length')
if [ $HITS_COUNT -eq 0 ]; then
break
fi
# jq를 사용하여 JSON 데이터 파싱 및 CSV 파일에 추가
echo $RESPONSE | jq -r '.hits.hits[] | [.["_source"]["@timestamp"], .["_source"]["project"], .["_source"]["loginId"], .["_source"]["client_ip"], .["_source"]["referer"], .["_source"]["message"]] | @csv' >> $CSV_FILE
# 다음 배치를 위해 FROM 증가
FROM=$((FROM + BATCH_SIZE))
done
# 다음 날짜로 이동
CURRENT_DATE=$(date -d "$CURRENT_DATE +1 day" +%Y-%m-%d)
done
# 모든 파일을 압축
ZIP_FILE="prod.privacy-${YEAR_MONTH}.zip"
zip $ZIP_FILE prod.privacy-${YEAR_MONTH}-*.csv
# ZIP 파일을 S3에 업로드
aws s3 cp $ZIP_FILE s3://$S3_BUCKET_NAME/
# CSV 파일 삭제
rm prod.privacy-${YEAR_MONTH}-*.csv
# 완료 메시지
echo "CSV 파일이 성공적으로 생성되고 압축되었습니다: $ZIP_FILE"
- 인자 확인:
- 스크립트가 실행될 때 year-month 형식의 인자(예: 2023-10)가 전달되었는지 확인한다.
- 인자가 없으면 사용법을 안내하고 종료한다.
- 변수 설정:
- YEAR_MONTH 변수에 인자를 저장하고, 그에 따른 START_DATE와 END_DATE를 계산한다. START_DATE는 해당 월의 첫날, END_DATE는 마지막 날로 설정한다.
- S3 버킷 이름을 설정한다.
- Elasticsearch 쿼리 템플릿 작성:
- ES_QUERY_TEMPLATE 변수를 통해 Elasticsearch 쿼리 템플릿을 설정한다. 각 환경에 맞는 쿼리 템플릿을 작성하면 된다. 필자의 경우 쿼리는 아래와 같은 조건을 갖게 된다.
- 특정 도메인(*domain*, *domain2*)에서의 referer와 특정 project 값이 일치하는 데이터를 조회
- 특정 문자열(login, popup, update, insert, COUNT)을 포함하지 않는 조건
- 조회된 결과는 @timestamp 기준으로 오름차순 정렬한다.
- 페이지네이션을 위해 FROM_PLACEHOLDER라는 자리 표시자가 있어 이후에 실제 조회 오프셋을 넣도록 설정다.
- ES_QUERY_TEMPLATE 변수를 통해 Elasticsearch 쿼리 템플릿을 설정한다. 각 환경에 맞는 쿼리 템플릿을 작성하면 된다. 필자의 경우 쿼리는 아래와 같은 조건을 갖게 된다.
- 날짜별 데이터 조회 및 CSV 파일 생성:
- 날짜별(CURRENT_DATE)로 루프를 돌며 Elasticsearch에 요청을 보내고 해당 날짜의 데이터를 조회하여 CSV 파일로 저장한다.
- 조회되는 Elasticsearch 인덱스 URL(ES_URL)은 날짜(FORMATTED_DATE)에 맞춰 동적으로 설정다.
- 첫 번째 조회 시, TOTAL_HITS 값에 조회되는 총 데이터 개수를 저장해 전체 데이터의 조회가 끝날 때까지 반복한다.
- Elasticsearch 데이터 조회 및 CSV 저장:
- curl을 사용하여 Elasticsearch에 데이터를 조회하고, jq를 사용해 조회된 JSON 데이터를 CSV 형식으로 변환하여 파일에 저장다.
- 데이터를 모두 조회할 때까지 반복한다(FROM 변수를 BATCH_SIZE만큼 증가시키며 페이지네이션).
- CSV 파일 압축:
- 해당 월의 CSV 파일을 하나의 ZIP 파일로 압축다.
- S3 업로드:
- AWS CLI(aws s3 cp)를 사용하여 ZIP 파일을 S3 버킷에 업로드다.
- 로컬 CSV 파일 정리:
- 업로드가 끝난 후 개별 CSV 파일은 삭제해 로컬 공간을 정리한다.
- 완료 메시지 출력:
- 최종적으로 압축 파일 생성 및 업로드 완료 메시지를 출력하여 작업이 끝났음을 알린다.
해당 방법으로 특정 인덱스를 조회하여 S3에 데이터를 수동으로 올리는 작업을 스크립트로 자동화하였지만 이 방법보다 초반 설계할 때 UltraWarm 노드와 Cold Storage를 활성화하여 데이터를 마이그레이션 하는 것이 R 패밀리와 비교하여 최대 90% 이상 비용을 절약할 수 있다고 한다. 물론 Cold Storage의 데이터를 읽기 위해서는 다시 UltraWarm 층에 마이그레이션 해야한다고 하지만 로그의 경우는 일주일이 넘어가면 조회하는 횟수가 많지 않다고 판단했다. UltraWarm 노드의 경우도 초반 조회할 때 살짝 늦는 것을 제외하고는 Hot node와 성능 체감이 심하게 될 정도는 아닌 것 같아 현재 환경에서는 큰 문제가 없다고 생각한다.
하기 영상은 Amazon Opensearch에 대한 내부구조와 최적화에 대한 영상인데 이를 참고하면 도움이 될 것 같아 남긴다.
https://www.youtube.com/watch?v=e9GpbaT18Mk&t=1624s
따라서 관련된 글은 다음 포스팅에서 확인해보도록 하자.
https://sh970901.tistory.com/159
'IT' 카테고리의 다른 글
쿠버네티스 환경 JAVA heap 참고 글 정리 (5) | 2024.10.29 |
---|---|
AWS Opensearch Datastream 및 ISM (3) | 2024.10.26 |
쿠버네티스[EKS] Kiali 설치 및 기존 Prometheus 활용 (2) | 2024.10.09 |
쿠버네티스[EKS] Istio 설치 및 적용 [istioctl, helm, operator] (3) | 2024.10.09 |
EKS worker node SSM 접속 (0) | 2024.10.09 |