Shuffle 이란?
Spark 는 여러 노드들에 데이터를 분산하여 병렬로 Task 를 수행하는데, 현재 분산되어 있는 구조로 수행하지 못하는 작업이 발생하면 특정 동작을 통해 데이터를 노드에 재분배하는 작업을 거친다. 이를 Shuffle 이라고 한다.
예를들어 정렬을 한다고 해보자. 정렬은 모든 데이터를 순서에 맞춰 봐야하기 떄문에 특정 순서에 맞게 재배치하는Shuffle 이 수행된다.
그럼 Count 는? 특정 데이터의 종류를 세는 연산은 각 파티션에서 병렬로 수행 후 취합하면 된다. 즉 독립적으로 처리할 수 있는 작업이므로 Shuffle이 수행되지 않는다.
이 외에 groupByKey, reduceByKey, join 등에선 shuffle 발생하고, fillter, map 등에선 shuffle이 발생하지 않는다.
이를 Wide Tranformation(셔플 발생), Narrow Transformation(셔플 미발생) 이라고 한다.
Shuffle 의 개념만 알고자하면 위 문단만 봐도 무방하다. 아래는 좀 더 내부적인 동작을 말하려 한다.
Partition
Spark 하나의 Job을 수행하게 되면 파티션으로 나눠 병렬로 작업을 진행한다.
병렬로 실행하는 작업은 각각 파티션으로 단위로 진행된다. 해당 파티션은 여려 노드에 분배되어 있다.
+ 참고로 동시에 실행되는 작업 수는 Core 만큼 실행되며, 그 이상의 파티션을 만들어도 동시 실행되는 파티션 이외 작업은 큐잉 되어 있다.
이쯤이면 감이 올 수 있다. Spark 성능 최적화의 핵심은 노드 간 데이터 이동을 최소화하고, 연산을 진행하는 것 이다
방법은 브로드캐스트(작은 데이터는 미리 전체분배), 쿼리 최적화 등 여러 방식이 존재한다.
Partition을 조정하고 싶다면?
Wide 연산 후 데이터가 고르게 분포되어있지 않거나, 파티션이 너무 쪼개져 있거나 등 여러 이유로 파티션을 조정하고 싶다면
Repartition, Coalesce 두가지를 사용하게 된다.
Repartition 은 Wide Tranformation 으로 파티션 크기를 Up/Down 하는데 사용한다.
Coalesce 은 우선 Narrow Tranformation 으로 주로 파티션 크기를 Down 하는데 사용한다. (무조건 Shuffle을 안함은 아니고 최소화를 목적으로 한다.)
예시코드
# 불균등 데이터 생성
data_skewed = [(i,) for i in range(1, 900001)] + [(i,) for i in range(900001, 1000001)] * 10
df_skewed = spark.createDataFrame(data_skewed, ["value"])
# 파티션 수 확인
print("Uniform Data Partitions:", df_uniform.rdd.getNumPartitions())
만약 해당 파티션을 4로 줄이는 작업이 필요 한다고 가정하자.
데이터가 모두 균등하다면, Repartition 또한 이동량이 적을 수 있어 나눠서 접근해봤다.
Repartition
df_skewed.repartition(4)
df_skewed.count() # 작업트리거
df_skewed.coalesce(4)
df_skewed.count() # 작업트리거
실행 커맨드와 해당 작업의 Spark UI 이다. Coalesce 의 Shuffle 작업과 비교해 Repartition의 Shuffle 작업이 크게 일어남을 볼 수 있다.
참고자료
Why Shuffle is best served as External Service in Spark
Shuffle?
medium.com
https://tech.kakao.com/posts/461
Spark Shuffle Partition과 최적화 - tech.kakao.com
안녕하세요. 카카오 데이터PE셀(응용분석팀)의 Logan입니다. 응용분석팀에서 식...
tech.kakao.com
'IT 기술 > 데이터엔지니어링' 카테고리의 다른 글
k8s에 Airflow helm 으로 배포하기 - dags,logs 폴더 마운트 (0) | 2025.02.26 |
---|---|
[Spark] Structured Streaming으로 실시간 데이터 처리하기 - Databricks 활용 (0) | 2024.12.16 |
[Spark] Spark 실행 방식 - SparkDriver, ClusterManager (1) | 2024.12.05 |
[Spark]UDF 의 정의 및 사용 방법 + UDF 별 수행 속도차이(PySpark)- 24.11 (1) | 2024.11.27 |
[Spark] Databricks Community Edition 무료로 사용하기 -2024.11 (1) | 2024.11.20 |