TechBooks
[Spark] 스파크 완벽 가이드 #2장
꿈나무 김땡땡
2022. 5. 6. 15:15
책 소개
제목 : 스파크 완벽 가이드
저자 : 빌 체임버스, 마테이 자하리아
https://www.aladin.co.kr/shop/wproduct.aspx?ItemId=175546079
스파크 완벽 가이드
스파크 창시자가 알려주는 스파크 활용과 배포, 유지 보수의 모든 것. 오픈소스 클러스터 컴퓨팅 프레임워크인 스파크의 창시자가 쓴 스파크에 대한 종합 안내서다. 스파크 사용법부터 배포, 유
www.aladin.co.kr
주요 주제
- 클러스터란?
- 스파크 애플리케이션 - 드라이버/ 익스큐터/ 클러스터 매니저
- 언어 - 스칼라/자바/파이썬/R/SQL
- API - 고수준/저수준
- 시작하기 - SparkSession
- SparkSession - spark
- DataFrame - partition
- Transformation - narrow dependency/ wide dependency(shuffle)/ Lazy Evaluation
- Action
- Spark UI
- 예제 - explain, read, sort, createOrReplaceTempView
명령어
- explain
- read
- sort
- createOrReplaceTempView
2.1 스파크의 기본 아키텍처
- 컴퓨터 클러스터는 여러 컴퓨터의 자원을 모아 하나의 컴퓨터처럼 사용할 수 있게 해준다.
- 스파크는 클러스터에서 작업을 조율하고 데이터를 처리할 수 있는 프레임워크
- 스파크가 연산에 사용하는 클러스터는 아래와 같은 클러스터 매니저가 관리: 클러스터 매니저에 애플리케이션 submit → 매니저가 애플리케이션 실행에 필요한 자원 할당 → 할당받은 자원으로 우리는 작업 처리
- Spark standalone 클러스터 매니저
- 하둡 YARN
- Mesos
- 하둡: 저장용 / 스파크: 분석용
2.1.1 스파크 애플리케이션
- 스파크는 사용 가능한 자원을 파악하기 위해 클러스터 매니저를 사용한다.
- 드라이버 프로세스는 주어진 작업을 완료하기 위해 드라이버 프로그램의 명령을 익스큐터에서 실행할 책임이 있다.
- 1개의 드라이버 프로세스 + N개의 익스큐터(executor) 프로세스로 구성
- 드라이버 프로세스 (SparkSession) - 명령
- 클러스터 노드 중 하나에서 실행 됨
- main() 함수 실행
- 스파크 애플리케이션 정보의 유지 관리, 사용자 프로그램/입력에 대한 응답, 전반적인 익스큐터 프로세스의 작업과 관련된 분석, 배포, 스케쥴링
- 애플리케이션의 수명 주기 동안 관련 정보 모두 유지 (일종의 심장 역할)
- 스파크의 언어 API를 통해 다양한 언어로 실행 가능
- 익스큐터 프로세스 - 실행
- 1. 드라이버 프로세스가 할당한 작업 수행
- 2. 진행 상황을 다시 드라이버 노드에 보고1개의 드라이버 프로세스 + N개의 익스큐터(executor) 프로세스로 구성
- 드라이버 프로세스 (SparkSession) - 명령
- 클러스터 매니저
- 물리적 머신 관리. 스파크애플리케이션에 자원 할당
프로세스(process)
- 실행 중에 있는 프로그램 (Program)
- 스케쥴링의 대상이 되는 작업(task)과 같은 의미
- 프로세스 내부에는 최소 하나의 thread가 있음. 실제로는 thread 단위로 스케쥴링
2.2 스파크의 다양한 언어 API
- 스칼라
- 스파크는 스칼라로 개발 되어 있어 스칼라가 스파크의 기본 언어
- 자바
- 파이썬
- SQL
- R
- sparkR, sparklyr - 2개의 라이브러리 존재
- 사용자는 스파크 코드를 실행하기 위해 SparkSession 객체를 진입점으로 사용
- 파이썬, R로 사용할 때는 JVM 코드를 명시적으로 작성하지 않음
- 스파크가 사용자를 대신해 파이썬, R 코드 → JVM 코드로 변환
2.3 스파크 API
- 스파크는 기본적으로 2가지 API 제공
- 저수준의 비구조적(Unstructured) API
- 고수준의 구조적(Structured) API
2.4 스파크 시작하기
- SparkSession 실행(생성)
- 대화형 모드로 시작하면 SparkSession 자동 생성 됨
- standalone 애플리케이션으로 시작하면 사용자 애플리케이션 코드에서 SparkSession 객체 직접 생성 필요
- 실행 방법
- 대화형 세션: ./bin/spark-shell → 스파크 콘솔 접속
- 대화형 세션 파이썬: ./bin/pyspark → 파이썬 콘솔 접속
- 스탠드얼론 애플리케이션을 스파크에 제출: spark-submit
- https://github.daumkakao.com/corporate-data/spark_etl/blob/master/pyspark_submit.py
- 우리는 브레인클라우드 상에서 스탠드얼론 어플리케이션 → 주피터 노트북 실행?
2.5 SparkSession
- 스파크 애플리케이션은 SparkSession 이라 불리는 드라이버 프로세스로 제어
- SparkSession 인스턴스는 사용자가 정의한 처리 명령을 클러스터에서 실행
- 1개의 SparkSession은 1개의 스파크 애플리케이션에 대응
- 스칼라, 파이썬 콘솔 시작하면 spark 변수로 SparkSession 사용 가능
spark
---------
[scala console] res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession...
[python console] <pspark.sql.session.SparkSession at 0x7efda4c1ccd0>
- 일정 범위의 숫자 만들어보기
// scala
val myRange = spark.range(1000).toDF("number")
# python
myRange = spark.range(1000).toDF("number")
- number 컬럼에 0~999 로우 생성
- 클러스터 모드라면 이 숫자 범위의 각 부분이 서로 다른 익스큐터에 할당 됨
2.6 DataFrame
- 가장 대표적인 구조적 API
- 데이터프레임은 테이블의 데이터를 로우와 컬럼으로 단순하게 표현
- Schema(스키마): 컬럼 & 컬럼의 타입을 정의한 목록
-
- python, R의 데이터프레임은 일반적으로 단일 컴퓨터에 저장 ↔ spark의 데이터프레임은 분산 컴퓨터에 저장
- python pandas dataframe, R dataframe → spark dataframe으로 쉽게 변환 가능
- python, R의 데이터프레임은 일반적으로 단일 컴퓨터에 저장 ↔ spark의 데이터프레임은 분산 컴퓨터에 저장
2.6.1 파티션
- 스파크는 모든 익스큐터가 병렬로 작업을 수행할 수 있도록 “파티션" 이라 불리는 청크 단위로 데이터 분할
- 파티션: DF 분할 저장 단위
- config에서 개수 설정
- 물리적 파티션에 데이터 변환용 함수를 지정하면 스파크가 실제 처리 방법 결정 (3장에서 자세히)
- 데이터프레임은 파티션을 수동으로 처리할 필요가 없음 (알아서 처리해줌)
2.7 트랜스포메이션
- 스파크의 핵심 데이터 구조: Immutable (불변성)
- 한번 생성하면 변경 불가
- Transformation: 데이터프레임을 변경하기 위해 변경 방법을 스파크에 알려주는 것
-
// df에서 짝수 찾기 //scala val divisBy2 = myRange.where("number % 2 = 0") # python divisBy2 = myRange.where("number % 2 = 0")
-
- 액션을 호출하기 전까지는 실제 트랜스포메이션 수행하지 않음
트랜스포메이션 유형
- 좁은 의존성 (Narrow Dependency)
- 하나의 입력 파티션이 하나의 출력 파티션에만 영향
- where, filter
- 스파크가 파이프라이닝(Pipelining) 자동으로 수행
- DF에 여러 필터를 지정하면 모든 작업이 메모리에서 발생
- 넓은 의존성 (Wide Dependency)
- 하나의 입력 파티션이 여러 출력 파티션에 영향
- groupby, join, sort
- shuffle : 스파크가 클러스터에서 파티션 교환
- 셔플의 결과를 디스크에 저장
- ‘셔플 최적화’ 중요한 주제!
2.7.1 지연 연산(Lazy Evaluation)
- 지연 연산: 스파크가 연산 그래프를 처리하기 직전까지 기다리는 동작 방식
- 특정 연산 명령이 내려진 즉시 데이터를 수정하지 않고 원시 데이터에 적용할 트랜스포메이션의 **‘실행 계획'**을 생성
- 코드를 실행하는 마지막 순간까지 대기 → 원형 DF 트랜스포메이션을 간결한 물리적 실행 계획으로 컴파일
- 이 과정을 거치며 전체 데이터 흐름 최적화 !!
- 예시: DF의 조건절 푸시다운(Predicate Pushdown)
- 복잡한 스파크 잡이 원시 데이터에서 하나의 로우만 가져오는 필터를 가지고 있을 때: 필요한 레코드 1개만 읽는 것이 가장 효율적
- 이 필터 계산 작업을 데이터소스(DB)에 위임 → 스파크는 하나의 레코드만 받음 → 처리에 필요한 자원 최소화: 최적화 작업 자동으로 수행
- 복잡한 스파크 잡이 원시 데이터에서 하나의 로우만 가져오는 필터를 가지고 있을 때: 필요한 레코드 1개만 읽는 것이 가장 효율적
2.8 액션
- 액션(Action): 실제 연산을 수행하는 명령
- Transformation으로는 논리적 실행 계획 세우기
- Action으로 실제 연산 수행
- 액션을 지정하면 스파크 잡(job) 시작
- 액션 1개 실행할 때마다 잡 1개 시작
- Driver → Job → Stage → Task
- 셔플이 발생하는 지점을 기준으로 스파크 잡 하나를 여러 stage 로 나눈다.
- 파티션 개수에 따라 task 만들어짐 (task의 수 = partition의 수) → executor에 배분
- 필터(좁은 트랜스포메이션) 수행 후 파티션별로 레코드 수를 카운트(넓은 트랜스포메이션) → 각 언어에 적합한 네이티브 객체에 결과 모으기
- 스파크 UI로 클러스터에서 실행중인 스파크 잡 모니터링 가능
2.9 스파크 UI
- 스파크 UI는 스파크 잡의 진행 상황을 모니터링할 때 사용
- 드라이버 노드의 4040 포트로 접속
- 스파크 잡의 상태, 환경 설정, 클러스터 상태 등의 정보를 확인 가능
- 스파크 잡 튜닝 & 디버깅 할 때 매우 유용
2.10 종합 예제
- 데이터는 SparkSession의 DataFrameReader 클래스를 사용해서 읽음
- 예제는 스파크 Dataframe의 스키마 정보를 알아내는 스키마 추론(schema inference) 기능 사용
-
flightData2015 = spark\ .read\ .option("inferSchema", "true")\ .option("header", "true")\ .csv("/data/flight-data/csv/2015-summary.csv")
- DataFrame은 불특정 다수의 row, column을 가짐
- row 수를 알 수 없는 이유는 데이터를 읽는 과정이 lazy-evaluation 형태의 transformation이기 때
- flightData2015.take(3) 을 통해 head 명령과 같은 결과 확인 가능
- explain() 을 통해 DataFrame의 계보(lineage)나 스파크의 쿼리 실행 계획 확인 가능
- 예시: flightData2015.sort(”count”).explain()
- 최종 결과는 가장 위에, 데이터 소스는 가장 아래에 있음
- 예시: flightData2015.sort(”count”).explain()
- 트랜스포메이션 실행 계획을 시작하기 위해 액션을 호출
- 액션 실행을 위한 설정
- spark.conf.set(”spark.sql.shuffle.partitions”, “5”)
- 스파크 셔플 시 기본적으로 200개의 셔플 파티션 생성 → 5로 설정해 출력 파티션 수 줄이기
- 논리적 실행 계획은 Dataframe의 계보를 정의
- 계보를 통해 입력 데이터에 수행한 연산을 전체 파티션에서 어떻게 재연산하는지 알 수 있음
- 사용자가 물리적 데이터를 직접 다루지 않고, 앞서 설정한 셔플 파티션 파라미터와 같은 속성으로 물리적 실행 특성 제어 (셔플 파티션 수 = 5 → 5개의 출력 파티션 생성)
- spark.conf.set(”spark.sql.shuffle.partitions”, “5”)
- 액션 실행을 위한 설정
2.10.1 DataFrame과 SQL
- 스파크는 언어에 관계없이 같은 방식으로 트랜스포메이션 실행 가능
- SQL, DataFrame으로 비즈니스 로직 표현하면 스파크에서 실제 코드 실행 전에 explain 메소드로 실행 계획 확인 가능
- 스파크 SQL을 사용하면 모든 DataFrame을 테이블이나 뷰로 등록한 후 SQL 쿼리 사용 가능
- SQL 쿼리를 DataFrame 코드와 같은 실행 계획으로 컴파일 하므로 스파크, SQL 성능 차이 없음
- createOrTempView 해야 SQL 수행 가능
# createOrReplaceTempView 메서드를 호출하면 모든 dataframe을 table/view로 만들 수 있음
flightData.createOrReplaceTempView("flight_data")
sqlWay = spark.sql("""
SELECT DEST_CONTRY_NAME, count(1)
FROM flight_data
GROUP BY DEST_COUNTRY_NAME
""")
dataFrameWay = flightData
.groupBy("DEST_COUNTRY_NAME")\
.count()
# 두 객체의 실행 계획 동일
sqlWay.explain()
dataFrameWay.explain()
- spark.sql 메소드로 SQL 쿼리 실행
- spark는 SparkSession의 변수
- DataFrame에 쿼리를 수행하면 새로운 DataFrame을 반환
-
spark.sql("SELECT max(count) from flight_data).take(1) # 파이썬 코드 from pyspark.sql.functions import max flightData.select(max("count")).take(1)
- max : DataFrame의 특정 칼럼 값을 스캔하면서 이전 최댓값보다 더 큰 값을 찾음
- 필터링을 수행해 단일 row를 결과로 반환
- max : DataFrame의 특정 칼럼 값을 스캔하면서 이전 최댓값보다 더 큰 값을 찾음
- 2개 실행 결과가 동일함
# 파이썬 코드
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")
maxSql.show()
# 파이썬 코드
from pyspark.sql.functions import desc
flightData\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.show()
flightData\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.explain() # 실행 계획 확인 가능
- 실행 계획은 트랜스포메이션의 지향성 비순환 그래프(Directed Acyclic Graph, DAG) 에 해당
- 액션이 호출 되면 결과를 만들어냄
- 각 단계에서는 불변성을 가진 신규 DataFrame 생성 → RDD 생성 아니었나?
- partition_sum : 집계가 두 단계로 나눠짐
- 숫자 목록의 합을 구하는 연산이 가환성(commutative)을 가지고 있어서 집계 연산시 파티션별 처리가 가능하기 때문
- commutative 찾아보기
- 숫자 목록의 합을 구하는 연산이 가환성(commutative)을 가지고 있어서 집계 연산시 파티션별 처리가 가능하기 때문
- physical plan
- 여러 plan을 만들고 가장 좋은 plan을 선정해서 실행
실제 코드 수행 단계
- 데이터 읽기
- 액션이 호출되기 전까지는 데이터 읽지 않음
- 데이터 그룹화
- groupBy 메소드가 호출되면 최종적으로 그룹화된 DataFrame을 지칭하는 RelationalGroupedDataset 반환
- 기본적으로 키/ 키셋을 기준으로 그룹을 만들고 → 각 키에 대한 집계 수행
- 집계 유형을 지정하기 위해 컬럼 표현식이나 컬럼명을 인수로 사용하는 sum 메소드 사용
- 새로운 스키마 정보를 가지는 별도의 DataFrame 생성
- 신규 스키마에는 새로 만들어진 각 컬럼의 데이터 타입 정보가 들어 있음
- 컬럼명 변경 : withColumnRenamed (트랜스포메이션)
- 데이터 정렬
- 첫번째 row가 destination_total 컬럼에서 가장 큰 값을 가지고 있음
- 역순으로 정렬하기 위해 desc 함수 import
- limit 메소드로 반환 결과의 수 제한
- 상위 5개의 row 반환
- 액션 수행
- 이 단계에서야 DataFrame의 결과를 모으는 프로세스 시작
- 처리가 끝나면 코드를 작성한 프로그래밍 언어에 맞는 리스트/ 배열 반환
- 각 단계에서 dataframe이 아니라 RDD 아닌가?
- 최근 버전 spark는 dataframe 사용. 원시 데이터에만 RDD 사용 (CH3 참고)
- dataframe 쓴다는 것은 결국 RDD 쓰는 것
- 데이터를 스파크가 지원하는 여러 데이터소스(PostgreSQL ...)로 내보내거나 결과를 저장할 수 있음
- 오버헤드: 부하 → 오버헤드가 커지면 컴퓨터의 연산 등에 무리가 감