본문 바로가기
TechBooks

[Spark] 스파크 완벽 가이드 #3장

by 꿈나무 김땡땡 2022. 5. 6.

책 소개

제목 : 스파크 완벽 가이드

저자 : 빌 체임버스, 마테이 자하리아

https://www.aladin.co.kr/shop/wproduct.aspx?ItemId=175546079

 

스파크 완벽 가이드

스파크 창시자가 알려주는 스파크 활용과 배포, 유지 보수의 모든 것. 오픈소스 클러스터 컴퓨팅 프레임워크인 스파크의 창시자가 쓴 스파크에 대한 종합 안내서다. 스파크 사용법부터 배포, 유

www.aladin.co.kr

 

주요 주제

  • spark-submit (SparkSession과 비교?)
  • Dataset
  • 구조적 스트리밍 - 배치를 연속적으로 처리. 시계열 데이터
  • 머신러닝과 고급 분석 - StringIndexer, OneHotEncoder, Vectorization, fitting, train/test
  • RDD - 저수준 API
  • SparkR
  • 서드파티 패키지 에코시스템
 

3.1 운영용 애플리케이션 실행하기

  • spark-submit
    • 애플리케이션 코드를 클러스터에 전송해 실행시키는 역할
    • 애플리케이션 실행
    • 옵션: 자원 할당, 실행 방식 등 지정
    • sparkSession 열고 → session에 필요한 옵션들 한번에 설정해줌
./bin/spark-submit \
 --master local \ # local에서 실행
 ./examples/src/main/python/pi.py 10
 

3.2 Dataset: 타입 안정성을 제공하는 구조적 API

  • dataframe: 구조적 API
  • dataset: 구조적 API
  • RDD: 비구조적 API
 
  • dataset
    • 자바, 스칼라용 구조적 API (파이썬, R 지원안함)
      • 형태는 dataframe과 동일
      • 단, 타입 안정성을 지원 → 초기화에 사용한 클래스 대신 다른 클래스 사용 불가 → 다수의 엔지니어가 함께 개발할 때 유용
    • 장점
      • 1. 필요한 경우에 선택적으로 사용 가능 (필요하면 dataset 썼다가 DF로 변환)
      • 2. collect, take 메소드 호출시 dataset에 매개변수로 지정한 타입의 객체 반환 (타입 안정성 보장)
 

3.3 구조적 스트리밍

  • 구조적 스트리밍
    • 스트림 처리용 고수준 API
    • 배치 모드의 연산을 스트리밍 방식으로 실행 가능
 
val staticDataFrame = spark.read.format("csv")
 .option("header", "true")
 .option("inferSchema", "true")
 .load("../data/retail-data/by-day/*.csv")
staticDataFrame.createOrReplaceTempView("retail_data")
val staticSchema = staticDataFrame.schema
 
  • 시계열 데이터 그룹화/집계
  • window function
    • 집계 할 때 시계열 컬럼을 기준으로 각 날짜에 대한 전체 데이터를 가지는 윈도우 구성
    • 윈도우는 간격을 통해 처리 요건을 명시할 수 있어서 날짜/시각 처리에 유용
    • 날짜 데이터를 그룹화 해줌
    • 데이터 처리 시점이 아닌 ‘이벤트 발생 시점'을 기준으로 윈도우 구성 가능
import org.apache.spark.sql.functions.{window, col}

staticDataFrame
.selectExpr("CustomerID", "(UnitPrice*Quantity) as total_cost","InvoiceDate")
.groupBy(
 col("CutomerID"), window(col("InvoiceDate"), "1 day"))
.sum("total_cost")
.show(5)
  • 런타임 줄이기 위해 출력 파티션 200(default) → 5로 변경
spark.conf.set("spark.sql.shuffle.partitions", "5")
 
  • 스트리밍 코드 작성 (read → readStream, maxFilesPerTrigger 옵션 지정)
    • maxFilesPerTrigger : 실제 운영 환경에는 적용 비추
val streamingDataFrame = spark.readStream
.schema(staticSchema)
.option("maxFilesPerTrigger", 1)
.format("csv")
.option("header", "true")
.load("../data/retail-data/by-day/*.csv")
  • DF가 스트리밍 유형인지 확인
streamingDataFrame.isStreaming // true 반환
 
  • 총 판매금액 계산
val purchaseByCustomerPerHour = streamingDataFrame
.selectExpr("CustomerID", "(UnitPrice*Quantity) as total_cost","InvoiceDate")
.groupBy(
 col("CustomerID"), window(col("InvoiceDate"), "1 day"))
.sum("total_cost")
  • 총 판매금액 계산해서 저장
purchaseByCustomerPerHour.writeStream
.format("memory") // memory=인메모리 테이블에 저장
.queryName("customer_purchase") // 인메모리 테이블에 저장될 테이블명
.outputMode("complete") // complete=모든 카운트 수행 결과를 테이블에 저장
.start()
 
  • 쿼리 실행 결과 확인
spark.sql("""
SELECT * FROM customer_purchase ORDER BY `sum(total_cost)` DESC""")
.show(5)
  • 결과를 콘솔에 출력하기
purchaseByCustomerPerHour.writeStream
.format("console") // console=console에 결과 출력
.queryName("customer_purchase2")
.outputMode("complete")
.start()
 

3.4 머신러닝과 고급 분석

  • MLlib를 이용해 대규모 머신러닝 가능
staticDataFrame.printSchema()
  • 날짜 데이터 다루기
import org.apache.spark.sql.functions.date_format

val preppedDataFrame = staticDataFrame
.na.fill(0)
.withColumn("day_of_week", date_format($"InvoiceDate", "EEEE"))
.coalesce(5)
  • 날짜를 기준으로 train set / test set 분리
val trainDataFrame = preppedDataFrame
.where("InvoiceDate < '2011-07-01'")
val testDataFrame = preppedDataFrame
.where("InvoiceDate >= '2011-07-01'")
  • 개수 확인
trainDataFrame.count()
testDataFrame.count()
  • StringIndexer : MLlib에서 제공하는 트랜스포메이션 자동화 메소드 (요일 → 수치형으로 반환)
import org.apache.spark.ml.feature.StringIndexer

val indexer = new StringIndexer()
.setInputCol("day_of_week")
.setOutputCol("day_of_week_index")
  • OneHotEncoder를 사용해 각 값을 자체 칼럼으로 인코딩
    • → 특정 요일이 해당 요일인지 아닌지 boolean 타입으로 나타냄
    • StringIndexer는 월요일(0) 보다 토요일(6)이 큰 것처럼 표현 되지만 이 데이터는 크고 작음이 없으므로 불린형으로 변경 필요
import org.apache.spark.ml.feature.OneHotEncoder

val encoder = new OneHotEncoder()
.setInputCol("day_of_week_index")
.setOutputCol("day_of_week_encoded")
  • 벡터 타입을 구성할 컬럼 중 하나로 변환
    • 스파크의 모든 머신러닝 알고리즘은 수치형 벡터 타입을 입력으로 사용
import org.apache.spark.ml.feature.VectorAssembler

val vectorAssembler = new VectorAssembler()
.setInputCols(Array("UnitPrice","Quantity","day_of_week_encoded"))
.setOutputCol("features")
  • 나중에 들어올 입력값도 같은 프로세스 거치도록 파이프라인 설정
import org.apache.spark.ml.Pipeline
val transformationPipeline = new Pipeline()
.setStages(Array(indexer, encoder, vectorAssembler))
  • fitting (transformation을 데이터 셋에 적합시키기)
val fittedPipeline = transformationPipeline.fit(trainDataFrame)
  • train
    • 학습을 위한 fitted pipeline이 fitting을 거쳐 준비됨 → 이 파이프라인으로 train data 변환
val transformedTraining = fittedPipeline.transform(trainDataFrame)
  • 중간에 train 시킬 때 cache 시키면 메모리에 중간 데이터 저장 됨 → 전체 파이프라인 재실행보다 훨씬 빠르게 반복 가능
transformedTraining.cache()
 
  • 모델 학습시키기
import org.apache.spark.ml.clustering.KMeans
val kmeans = new KMeans().setK(20).setSeed(1L)
  • 스파크 머신러닝 학습 과정
    • 1. 아직 학습되지 않은 모델 초기화 (학습전 알고리즘 명칭: 알고리즘명)
    • 2. 해당 모델 학습 시키기 (학습후 알고리즘 명칭: 알고리즘명+Model)
  • estimator는 StringIndexer와 유사
  • fitting
val kmModel = kmeans.fit(transformedTraining)
  • 군집 비용 계산 (각 군집 중심점과의 제곱 거리의 합)
// deprecated
kmModel.computeCost(transformedTraining)

// test data 변환
val transformedTest = fittedPipeline.transform(testDataFrame)

kmModel.computedCost(transformedTest)
// new version (안되네..)
import org.apache.spark.ml.evaluation.ClusteringEvaluator
val evaluator = new ClusteringEvaluator()
evaluator.evaluate(transformedTraining)

// test data 변환
val transformedTest = fittedPipeline.transform(testDataFrame)
evaluator.evaluate(transformedTest)
 

3.5 저수준 API

  • RDD
    • DataFrame 연산도 RDD를 기반으로 만들엊미
    • 고수준 API를 사용하는 것이 좋지만 RDD를 이용해 파티션 등 더 세밀한 제어 가능
    • 스칼라, 파이썬 모두 사용 가능 but 구현은 조금 다름
  • 최신 버전의 스파크에서는 RDD 사용 안함 but 비정형데이터/정제되지않은 원시 데이터에는 RDD 사용 필요
 

3.6 SparkR

  • SparkR
    • 스파크를 R 언어로 사용하기 위한 기능

3.7 스파크의 에코시스템과 패키지

  • 스파크의 가장 장점은 커뮤니티가 만든 패키지 에코시스템 & 다양한 기능
  • spark-packages.org 에서 확인 가능

댓글