TechBooks

[Spark] 스파크 완벽 가이드 #2장

꿈나무 김땡땡 2022. 5. 6. 15:15

책 소개

제목 : 스파크 완벽 가이드

저자 : 빌 체임버스, 마테이 자하리아

https://www.aladin.co.kr/shop/wproduct.aspx?ItemId=175546079

 

스파크 완벽 가이드

스파크 창시자가 알려주는 스파크 활용과 배포, 유지 보수의 모든 것. 오픈소스 클러스터 컴퓨팅 프레임워크인 스파크의 창시자가 쓴 스파크에 대한 종합 안내서다. 스파크 사용법부터 배포, 유

www.aladin.co.kr

 

주요 주제

  • 클러스터란?
  • 스파크 애플리케이션 - 드라이버/ 익스큐터/ 클러스터 매니저
  • 언어 - 스칼라/자바/파이썬/R/SQL
  • API - 고수준/저수준
  • 시작하기 - SparkSession
  • SparkSession - spark
  • DataFrame - partition
  • Transformation - narrow dependency/ wide dependency(shuffle)/ Lazy Evaluation
  • Action
  • Spark UI
  • 예제 - explain, read, sort, createOrReplaceTempView
명령어
  • explain
  • read
  • sort
  • createOrReplaceTempView

 

2.1 스파크의 기본 아키텍처

  • 컴퓨터 클러스터는 여러 컴퓨터의 자원을 모아 하나의 컴퓨터처럼 사용할 수 있게 해준다.
  • 스파크는 클러스터에서 작업을 조율하고 데이터를 처리할 수 있는 프레임워크
  • 스파크가 연산에 사용하는 클러스터는 아래와 같은 클러스터 매니저가 관리: 클러스터 매니저에 애플리케이션 submit → 매니저가 애플리케이션 실행에 필요한 자원 할당 → 할당받은 자원으로 우리는 작업 처리
    • Spark standalone 클러스터 매니저
    • 하둡 YARN
    • Mesos
  • 하둡: 저장용 / 스파크: 분석용

2.1.1 스파크 애플리케이션

  • 스파크는 사용 가능한 자원을 파악하기 위해 클러스터 매니저를 사용한다.
  • 드라이버 프로세스는 주어진 작업을 완료하기 위해 드라이버 프로그램의 명령을 익스큐터에서 실행할 책임이 있다. 
  • 1개의 드라이버 프로세스 + N개의 익스큐터(executor) 프로세스로 구성
    • 드라이버 프로세스 (SparkSession) - 명령
      • 클러스터 노드 중 하나에서 실행 됨
      • main() 함수 실행
      • 스파크 애플리케이션 정보의 유지 관리, 사용자 프로그램/입력에 대한 응답, 전반적인 익스큐터 프로세스의 작업과 관련된 분석, 배포, 스케쥴링
      • 애플리케이션의 수명 주기 동안 관련 정보 모두 유지 (일종의 심장 역할)
      • 스파크의 언어 API를 통해 다양한 언어로 실행 가능
    • 익스큐터 프로세스 - 실행
      • 1. 드라이버 프로세스가 할당한 작업 수행
      • 2. 진행 상황을 다시 드라이버 노드에 보고1개의 드라이버 프로세스 + N개의 익스큐터(executor) 프로세스로 구성

  • 클러스터 매니저
    • 물리적 머신 관리. 스파크애플리케이션에 자원 할당

프로세스(process)

  • 실행 중에 있는 프로그램 (Program)
  • 스케쥴링의 대상이 되는 작업(task)과 같은 의미
  • 프로세스 내부에는 최소 하나의 thread가 있음. 실제로는 thread 단위로 스케쥴링

2.2 스파크의 다양한 언어 API

  1. 스칼라
    • 스파크는 스칼라로 개발 되어 있어 스칼라가 스파크의 기본 언어
  2. 자바
  3. 파이썬
  4. SQL
  5. R
    • sparkR, sparklyr - 2개의 라이브러리 존재

  • 사용자는 스파크 코드를 실행하기 위해 SparkSession 객체를 진입점으로 사용
    • 파이썬, R로 사용할 때는 JVM 코드를 명시적으로 작성하지 않음
    • 스파크가 사용자를 대신해 파이썬, R 코드 → JVM 코드로 변환

2.3 스파크 API

  • 스파크는 기본적으로 2가지 API 제공
  1. 저수준의 비구조적(Unstructured) API
  2. 고수준의 구조적(Structured) API

2.4 스파크 시작하기

  • SparkSession 실행(생성)
    • 대화형 모드로 시작하면 SparkSession 자동 생성 됨
    • standalone 애플리케이션으로 시작하면 사용자 애플리케이션 코드에서 SparkSession 객체 직접 생성 필요
  • 실행 방법

2.5 SparkSession

  • 스파크 애플리케이션은 SparkSession 이라 불리는 드라이버 프로세스로 제어
    • SparkSession 인스턴스는 사용자가 정의한 처리 명령을 클러스터에서 실행
  • 1개의 SparkSession은 1개의 스파크 애플리케이션에 대응
    • 스칼라, 파이썬 콘솔 시작하면 spark 변수로 SparkSession 사용 가능
spark
---------
[scala console] res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession...
[python console] <pspark.sql.session.SparkSession at 0x7efda4c1ccd0>
  • 일정 범위의 숫자 만들어보기
// scala
val myRange = spark.range(1000).toDF("number")

# python
myRange = spark.range(1000).toDF("number")
  • number 컬럼에 0~999 로우 생성
    • 클러스터 모드라면 이 숫자 범위의 각 부분이 서로 다른 익스큐터에 할당 

2.6 DataFrame

  • 가장 대표적인 구조적 API
  • 데이터프레임은 테이블의 데이터를 로우와 컬럼으로 단순하게 표현
    • Schema(스키마): 컬럼 & 컬럼의 타입을 정의한 목록

    • python, R의 데이터프레임은 일반적으로 단일 컴퓨터에 저장 ↔ spark의 데이터프레임은 분산 컴퓨터에 저장
      • python pandas dataframe, R dataframe → spark dataframe으로 쉽게 변환 가능

2.6.1 파티션

  • 스파크는 모든 익스큐터가 병렬로 작업을 수행할 수 있도록 “파티션" 이라 불리는 청크 단위로 데이터 분할
    • 파티션: DF 분할 저장 단위
    • config에서 개수 설정
  • 물리적 파티션에 데이터 변환용 함수를 지정하면 스파크가 실제 처리 방법 결정 (3장에서 자세히)
  • 데이터프레임은 파티션을 수동으로 처리할 필요가 없음 (알아서 처리해줌)

2.7 트랜스포메이션

  • 스파크의 핵심 데이터 구조: Immutable (불변성)
    • 한번 생성하면 변경 불가
  • Transformation: 데이터프레임을 변경하기 위해 변경 방법을 스파크에 알려주는 것
    • // df에서 짝수 찾기
      //scala
      val divisBy2 = myRange.where("number % 2 = 0")
      
      # python
      divisBy2 = myRange.where("number % 2 = 0")
  • 액션을 호출하기 전까지는 실제 트랜스포메이션 수행하지 않음

트랜스포메이션 유형

  • 좁은 의존성 (Narrow Dependency)
    • 하나의 입력 파티션이 하나의 출력 파티션에만 영향
    • where, filter
    • 스파크가 파이프라이닝(Pipelining) 자동으로 수행
      • DF에 여러 필터를 지정하면 모든 작업이 메모리에서 발생
  • 넓은 의존성 (Wide Dependency)
    • 하나의 입력 파티션이 여러 출력 파티션에 영향
    • groupby, join, sort
    • shuffle : 스파크가 클러스터에서 파티션 교환
      • 셔플의 결과를 디스크에 저장
      • ‘셔플 최적화’ 중요한 주제!

2.7.1 지연 연산(Lazy Evaluation)

  • 지연 연산: 스파크가 연산 그래프를 처리하기 직전까지 기다리는 동작 방식
    • 특정 연산 명령이 내려진 즉시 데이터를 수정하지 않고 원시 데이터에 적용할 트랜스포메이션의 **‘실행 계획'**을 생성
    • 코드를 실행하는 마지막 순간까지 대기 → 원형 DF 트랜스포메이션을 간결한 물리적 실행 계획으로 컴파일
    • 이 과정을 거치며 전체 데이터 흐름 최적화 !!
  • 예시: DF의 조건절 푸시다운(Predicate Pushdown)
    • 복잡한 스파크 잡이 원시 데이터에서 하나의 로우만 가져오는 필터를 가지고 있을 때: 필요한 레코드 1개만 읽는 것이 가장 효율적
      • 이 필터 계산 작업을 데이터소스(DB)에 위임 → 스파크는 하나의 레코드만 받음 → 처리에 필요한 자원 최소화: 최적화 작업 자동으로 수행

2.8 액션

  • 액션(Action): 실제 연산을 수행하는 명령
    • Transformation으로는 논리적 실행 계획 세우기
    • Action으로 실제 연산 수행
  • 액션을 지정하면 스파크 잡(job) 시작
    • 액션 1개 실행할 때마다 잡 1개 시작
    • Driver → Job → Stage → Task
      • 셔플이 발생하는 지점을 기준으로 스파크 잡 하나를 여러 stage 로 나눈다.
      • 파티션 개수에 따라 task 만들어짐 (task의 수 = partition의 수) → executor에 배분
    • 필터(좁은 트랜스포메이션) 수행 후 파티션별로 레코드 수를 카운트(넓은 트랜스포메이션) → 각 언어에 적합한 네이티브 객체에 결과 모으기
    • 스파크 UI로 클러스터에서 실행중인 스파크 잡 모니터링 가능

2.9 스파크 UI

  • 스파크 UI는 스파크 잡의 진행 상황을 모니터링할 때 사용
  • 드라이버 노드의 4040 포트로 접속
  • 스파크 잡의 상태, 환경 설정, 클러스터 상태 등의 정보를 확인 가능
    • 스파크 잡 튜닝 & 디버깅 할 때 매우 유용

2.10 종합 예제

  • 데이터는 SparkSession의 DataFrameReader 클래스를 사용해서 읽음
    • 예제는 스파크 Dataframe의 스키마 정보를 알아내는 스키마 추론(schema inference) 기능 사용
  •   flightData2015 = spark\
       .read\
       .option("inferSchema", "true")\
       .option("header", "true")\
       .csv("/data/flight-data/csv/2015-summary.csv")
    
    • DataFrame은 불특정 다수의 row, column을 가짐
    • row 수를 알 수 없는 이유는 데이터를 읽는 과정이 lazy-evaluation 형태의 transformation이기 때
    • flightData2015.take(3) 을 통해 head 명령과 같은 결과 확인 가능
  • explain() 을 통해 DataFrame의 계보(lineage)나 스파크의 쿼리 실행 계획 확인 가능
    • 예시: flightData2015.sort(”count”).explain()
      • 최종 결과는 가장 위에, 데이터 소스는 가장 아래에 있음
  • 트랜스포메이션 실행 계획을 시작하기 위해 액션을 호출
    • 액션 실행을 위한 설정
      • spark.conf.set(”spark.sql.shuffle.partitions”, “5”)
        • 스파크 셔플 시 기본적으로 200개의 셔플 파티션 생성 → 5로 설정해 출력 파티션 수 줄이기
      • 논리적 실행 계획은 Dataframe의 계보를 정의
        • 계보를 통해 입력 데이터에 수행한 연산을 전체 파티션에서 어떻게 재연산하는지 알 수 있음
      • 사용자가 물리적 데이터를 직접 다루지 않고, 앞서 설정한 셔플 파티션 파라미터와 같은 속성으로 물리적 실행 특성 제어 (셔플 파티션 수 = 5 → 5개의 출력 파티션 생성)

2.10.1 DataFrame과 SQL

  • 스파크는 언어에 관계없이 같은 방식으로 트랜스포메이션 실행 가능
    • SQL, DataFrame으로 비즈니스 로직 표현하면 스파크에서 실제 코드 실행 전에 explain 메소드로 실행 계획 확인 가능
  • 스파크 SQL을 사용하면 모든 DataFrame을 테이블이나 뷰로 등록한 후 SQL 쿼리 사용 가능
    • SQL 쿼리를 DataFrame 코드와 같은 실행 계획으로 컴파일 하므로 스파크, SQL 성능 차이 없음
  • createOrTempView 해야 SQL 수행 가능
# createOrReplaceTempView 메서드를 호출하면 모든 dataframe을 table/view로 만들 수 있음
flightData.createOrReplaceTempView("flight_data")

sqlWay = spark.sql("""
SELECT DEST_CONTRY_NAME, count(1)
FROM flight_data
GROUP BY DEST_COUNTRY_NAME
""")

dataFrameWay = flightData
 .groupBy("DEST_COUNTRY_NAME")\
 .count()

# 두 객체의 실행 계획 동일
sqlWay.explain()
dataFrameWay.explain()
- spark.sql 메소드로 SQL 쿼리 실행

- spark는 SparkSession의 변수

- DataFrame에 쿼리를 수행하면 새로운 DataFrame을 반환
  •   spark.sql("SELECT max(count) from flight_data).take(1)
      
      # 파이썬 코드
      from pyspark.sql.functions import max
      flightData.select(max("count")).take(1)
    
    • max : DataFrame의 특정 칼럼 값을 스캔하면서 이전 최댓값보다 더 큰 값을 찾음
      • 필터링을 수행해 단일 row를 결과로 반환
  • 2개 실행 결과가 동일함
# 파이썬 코드
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")

maxSql.show()
# 파이썬 코드
from pyspark.sql.functions import desc

flightData\
 .groupBy("DEST_COUNTRY_NAME")\
 .sum("count")\
 .withColumnRenamed("sum(count)", "destination_total")\
 .sort(desc("destination_total"))\
 .limit(5)\
 .show()

flightData\
 .groupBy("DEST_COUNTRY_NAME")\
 .sum("count")\
 .withColumnRenamed("sum(count)", "destination_total")\
 .sort(desc("destination_total"))\
 .limit(5)\
 .explain() # 실행 계획 확인 가능
  • 실행 계획은 트랜스포메이션의 지향성 비순환 그래프(Directed Acyclic Graph, DAG) 에 해당
    • 액션이 호출 되면 결과를 만들어냄
    • 각 단계에서는 불변성을 가진 신규 DataFrame 생성 → RDD 생성 아니었나?
    • partition_sum : 집계가 두 단계로 나눠짐
      • 숫자 목록의 합을 구하는 연산이 가환성(commutative)을 가지고 있어서 집계 연산시 파티션별 처리가 가능하기 때문
        • commutative 찾아보기
  • physical plan
    • 여러 plan을 만들고 가장 좋은 plan을 선정해서 실행

실제 코드 수행 단계

  1. 데이터 읽기
    • 액션이 호출되기 전까지는 데이터 읽지 않음
  2. 데이터 그룹화
    • groupBy 메소드가 호출되면 최종적으로 그룹화된 DataFrame을 지칭하는 RelationalGroupedDataset 반환
    • 기본적으로 키/ 키셋을 기준으로 그룹을 만들고 → 각 키에 대한 집계 수행
  3. 집계 유형을 지정하기 위해 컬럼 표현식이나 컬럼명을 인수로 사용하는 sum 메소드 사용
    • 새로운 스키마 정보를 가지는 별도의 DataFrame 생성
    • 신규 스키마에는 새로 만들어진 각 컬럼의 데이터 타입 정보가 들어 있음
  4. 컬럼명 변경 : withColumnRenamed (트랜스포메이션)
  5. 데이터 정렬
    • 첫번째 row가 destination_total 컬럼에서 가장 큰 값을 가지고 있음
    • 역순으로 정렬하기 위해 desc 함수 import
  6. limit 메소드로 반환 결과의 수 제한
    1. 상위 5개의 row 반환
  7. 액션 수행
    • 이 단계에서야 DataFrame의 결과를 모으는 프로세스 시작
    • 처리가 끝나면 코드를 작성한 프로그래밍 언어에 맞는 리스트/ 배열 반환
    • 각 단계에서 dataframe이 아니라 RDD 아닌가?
      • 최근 버전 spark는 dataframe 사용. 원시 데이터에만 RDD 사용 (CH3 참고)
      • dataframe 쓴다는 것은 결국 RDD 쓰는 것
  • 데이터를 스파크가 지원하는 여러 데이터소스(PostgreSQL ...)로 내보내거나 결과를 저장할 수 있음
  • 오버헤드: 부하 → 오버헤드가 커지면 컴퓨터의 연산 등에 무리가 감