이번 글에서는 업무에서 사용하면서 Spark 성능을 높이기 위해서 적용한 튜닝포인트들을 정리하도록 하겠다. Spark를 사용하면서 느꼈던 것은 쉽게 사용하려면 얼마든 쉽게 사용하겠지만 기본적인 작동원리를 잘 알고 사용한다면 2~3배의 성능향상을 얻을 수 있다는 것을 알게 되었다.
1. coalesce와 repartition의 차이. - shuffle 줄이기
업무를 진행하면서 Spark잡으로 ETL 프로세스를 진행하다 보면 파일이 너무 많이 생성되기 때문에 이를 해결하기 위해 repartition(1)의 옵션을 주어서 파일의 개수를 줄이는 경우가 있다.
이 경우 coalesce 함수를 사용해야지만 shuffle(excutor간에 데이터 재분배하는 방법으로 성능에 영향을줌)이 일어나지 않기 때문에 파일 개수를 줄이는 경우라면 coalesce 함수를 사용하는 것이 좋다.
추가로 꼭 1개의 파일 개수를 만들어야 하는 경우가 아니라면 spark job의 numexcutor수와 파일 개수를 맞춰주는 것이 성능향상에 도움이 된다.
실제로 대용량 write 작업 시에 coalesce(1)로 numexcutor 15개로 작업을 처리하는 것보다는 numexcutor 10개로 줄이고 coalesce(10)개로 하는 것이 성능이 더 뛰어났다.
이는 병렬처리시 파일 개수가 1개면 성능이 떨어지는 것이 원인으로 추정? 된다.
2. transformation 함수와 action함수의 차이를 잘 알고 사용하기.
spark의 함수는 크게 변환작업에 사용되는 transformation함수와 transformation 함수를 실행시키는 action함수가있는데 이를 잘 알지 못하면 아래와 같은 상황이 발생하여 성능이 크게 떨어진다.
1
2
3
4
5
6
7
categories=['a','b','c']
df = predf.filter((predf.category).isin(categories))
for category in categories:
df=df.filter(df.category==f"{category}")
df.write(Path)
위의 코드에서 문제점은 필터를할때는 액션이 발생하지 않아 for문이 수행되면서 첫번째 필터와 for문안에 있는 필터함수가 같이 수행되어 매번 모든 데이터를 읽어 필터링해 성능이 크게 떨어진다.
이런경우 아래와 같이 해결할 수 있다.
1
2
3
4
5
6
7
categories=['a','b','c']
df = predf.filter((predf.category).isin(categories))
df = df.cache()
for category in categories:
df=df.filter(df.category==f"{category}")
df.write(Path)
df.cache함수만 추가해주어도 for문이 수행하기전에 dataframe을 만드는 action이 발생하고 필터링된 데이터를 for문에서 읽기 때문에 성능이 향상된다.
3. cache 함수와 persist 함수 사용하기.
2번의 경우에 cache함수를 사용하여 성능 향상을 했기 때문에 이번에는 cache함수와 persist함수의 차이를 설명한다.
우선 cache함수는 메모리에 데이터프레임을 모두 올려서 사용하는 함수이며 persist함수는 storeage레벨을 인자로 받아 적절하게 데이터프레임을 메모리 또는 디스크에 써서 사용하는 함수이다.
persist함수의 MEMORY_ONLY인자는 cache와 같은 함수이다.
- persist함수의 Storeage레벨 종류
4. skew가 일어나지 않도록 데이터를 균등하게 나눌 수 있는 컬럼을 분할 컬럼으로 지정하기.
skew(데이터가 한쪽으로 쏠리는 현상)가 일어나면 분산 처리 시에 성능이 크게 떨어지기 때문에 spark 작업뿐만 아니라 대부분의 분산 처리 시에도 유의해야 할 점이다.