책 소개
제목 : 스파크 완벽 가이드
저자 : 빌 체임버스, 마테이 자하리아
https://www.aladin.co.kr/shop/wproduct.aspx?ItemId=175546079
주요 주제
- spark-submit (SparkSession과 비교?)
- Dataset
- 구조적 스트리밍 - 배치를 연속적으로 처리. 시계열 데이터
- 머신러닝과 고급 분석 - StringIndexer, OneHotEncoder, Vectorization, fitting, train/test
- RDD - 저수준 API
- SparkR
- 서드파티 패키지 에코시스템
3.1 운영용 애플리케이션 실행하기
- spark-submit
- 애플리케이션 코드를 클러스터에 전송해 실행시키는 역할
- 애플리케이션 실행
- 옵션: 자원 할당, 실행 방식 등 지정
- sparkSession 열고 → session에 필요한 옵션들 한번에 설정해줌
./bin/spark-submit \
--master local \ # local에서 실행
./examples/src/main/python/pi.py 10
3.2 Dataset: 타입 안정성을 제공하는 구조적 API
- dataframe: 구조적 API
- dataset: 구조적 API
- RDD: 비구조적 API
- dataset
- 자바, 스칼라용 구조적 API (파이썬, R 지원안함)
- 형태는 dataframe과 동일
- 단, 타입 안정성을 지원 → 초기화에 사용한 클래스 대신 다른 클래스 사용 불가 → 다수의 엔지니어가 함께 개발할 때 유용
- 장점
- 1. 필요한 경우에 선택적으로 사용 가능 (필요하면 dataset 썼다가 DF로 변환)
- 2. collect, take 메소드 호출시 dataset에 매개변수로 지정한 타입의 객체 반환 (타입 안정성 보장)
- 자바, 스칼라용 구조적 API (파이썬, R 지원안함)
3.3 구조적 스트리밍
- 구조적 스트리밍
- 스트림 처리용 고수준 API
- 배치 모드의 연산을 스트리밍 방식으로 실행 가능
val staticDataFrame = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("../data/retail-data/by-day/*.csv")
staticDataFrame.createOrReplaceTempView("retail_data")
val staticSchema = staticDataFrame.schema
- 시계열 데이터 그룹화/집계
- window function
- 집계 할 때 시계열 컬럼을 기준으로 각 날짜에 대한 전체 데이터를 가지는 윈도우 구성
- 윈도우는 간격을 통해 처리 요건을 명시할 수 있어서 날짜/시각 처리에 유용
- 날짜 데이터를 그룹화 해줌
- 데이터 처리 시점이 아닌 ‘이벤트 발생 시점'을 기준으로 윈도우 구성 가능
import org.apache.spark.sql.functions.{window, col}
staticDataFrame
.selectExpr("CustomerID", "(UnitPrice*Quantity) as total_cost","InvoiceDate")
.groupBy(
col("CutomerID"), window(col("InvoiceDate"), "1 day"))
.sum("total_cost")
.show(5)
- 런타임 줄이기 위해 출력 파티션 200(default) → 5로 변경
spark.conf.set("spark.sql.shuffle.partitions", "5")
- 스트리밍 코드 작성 (read → readStream, maxFilesPerTrigger 옵션 지정)
- maxFilesPerTrigger : 실제 운영 환경에는 적용 비추
val streamingDataFrame = spark.readStream
.schema(staticSchema)
.option("maxFilesPerTrigger", 1)
.format("csv")
.option("header", "true")
.load("../data/retail-data/by-day/*.csv")
- DF가 스트리밍 유형인지 확인
streamingDataFrame.isStreaming // true 반환
- 총 판매금액 계산
val purchaseByCustomerPerHour = streamingDataFrame
.selectExpr("CustomerID", "(UnitPrice*Quantity) as total_cost","InvoiceDate")
.groupBy(
col("CustomerID"), window(col("InvoiceDate"), "1 day"))
.sum("total_cost")
- 총 판매금액 계산해서 저장
purchaseByCustomerPerHour.writeStream
.format("memory") // memory=인메모리 테이블에 저장
.queryName("customer_purchase") // 인메모리 테이블에 저장될 테이블명
.outputMode("complete") // complete=모든 카운트 수행 결과를 테이블에 저장
.start()
- 쿼리 실행 결과 확인
spark.sql("""
SELECT * FROM customer_purchase ORDER BY `sum(total_cost)` DESC""")
.show(5)
- 결과를 콘솔에 출력하기
purchaseByCustomerPerHour.writeStream
.format("console") // console=console에 결과 출력
.queryName("customer_purchase2")
.outputMode("complete")
.start()
3.4 머신러닝과 고급 분석
- MLlib를 이용해 대규모 머신러닝 가능
staticDataFrame.printSchema()
- 날짜 데이터 다루기
import org.apache.spark.sql.functions.date_format
val preppedDataFrame = staticDataFrame
.na.fill(0)
.withColumn("day_of_week", date_format($"InvoiceDate", "EEEE"))
.coalesce(5)
- 날짜를 기준으로 train set / test set 분리
val trainDataFrame = preppedDataFrame
.where("InvoiceDate < '2011-07-01'")
val testDataFrame = preppedDataFrame
.where("InvoiceDate >= '2011-07-01'")
- 개수 확인
trainDataFrame.count()
testDataFrame.count()
- StringIndexer : MLlib에서 제공하는 트랜스포메이션 자동화 메소드 (요일 → 수치형으로 반환)
import org.apache.spark.ml.feature.StringIndexer
val indexer = new StringIndexer()
.setInputCol("day_of_week")
.setOutputCol("day_of_week_index")
- OneHotEncoder를 사용해 각 값을 자체 칼럼으로 인코딩
- → 특정 요일이 해당 요일인지 아닌지 boolean 타입으로 나타냄
- StringIndexer는 월요일(0) 보다 토요일(6)이 큰 것처럼 표현 되지만 이 데이터는 크고 작음이 없으므로 불린형으로 변경 필요
import org.apache.spark.ml.feature.OneHotEncoder
val encoder = new OneHotEncoder()
.setInputCol("day_of_week_index")
.setOutputCol("day_of_week_encoded")
- 벡터 타입을 구성할 컬럼 중 하나로 변환
- 스파크의 모든 머신러닝 알고리즘은 수치형 벡터 타입을 입력으로 사용
import org.apache.spark.ml.feature.VectorAssembler
val vectorAssembler = new VectorAssembler()
.setInputCols(Array("UnitPrice","Quantity","day_of_week_encoded"))
.setOutputCol("features")
- 나중에 들어올 입력값도 같은 프로세스 거치도록 파이프라인 설정
import org.apache.spark.ml.Pipeline
val transformationPipeline = new Pipeline()
.setStages(Array(indexer, encoder, vectorAssembler))
- fitting (transformation을 데이터 셋에 적합시키기)
val fittedPipeline = transformationPipeline.fit(trainDataFrame)
- train
- 학습을 위한 fitted pipeline이 fitting을 거쳐 준비됨 → 이 파이프라인으로 train data 변환
val transformedTraining = fittedPipeline.transform(trainDataFrame)
- 중간에 train 시킬 때 cache 시키면 메모리에 중간 데이터 저장 됨 → 전체 파이프라인 재실행보다 훨씬 빠르게 반복 가능
transformedTraining.cache()
- 모델 학습시키기
import org.apache.spark.ml.clustering.KMeans
val kmeans = new KMeans().setK(20).setSeed(1L)
- 스파크 머신러닝 학습 과정
- 1. 아직 학습되지 않은 모델 초기화 (학습전 알고리즘 명칭: 알고리즘명)
- 2. 해당 모델 학습 시키기 (학습후 알고리즘 명칭: 알고리즘명+Model)
- estimator는 StringIndexer와 유사
- fitting
val kmModel = kmeans.fit(transformedTraining)
- 군집 비용 계산 (각 군집 중심점과의 제곱 거리의 합)
// deprecated
kmModel.computeCost(transformedTraining)
// test data 변환
val transformedTest = fittedPipeline.transform(testDataFrame)
kmModel.computedCost(transformedTest)
// new version (안되네..)
import org.apache.spark.ml.evaluation.ClusteringEvaluator
val evaluator = new ClusteringEvaluator()
evaluator.evaluate(transformedTraining)
// test data 변환
val transformedTest = fittedPipeline.transform(testDataFrame)
evaluator.evaluate(transformedTest)
3.5 저수준 API
- RDD
- DataFrame 연산도 RDD를 기반으로 만들엊미
- 고수준 API를 사용하는 것이 좋지만 RDD를 이용해 파티션 등 더 세밀한 제어 가능
- 스칼라, 파이썬 모두 사용 가능 but 구현은 조금 다름
- 최신 버전의 스파크에서는 RDD 사용 안함 but 비정형데이터/정제되지않은 원시 데이터에는 RDD 사용 필요
3.6 SparkR
- SparkR
- 스파크를 R 언어로 사용하기 위한 기능
3.7 스파크의 에코시스템과 패키지
- 스파크의 가장 장점은 커뮤니티가 만든 패키지 에코시스템 & 다양한 기능
- spark-packages.org 에서 확인 가능
'TechBooks' 카테고리의 다른 글
[Spark] 스파크 완벽 가이드 #2장 (0) | 2022.05.06 |
---|---|
[Spark] 스파크 완벽 가이드 #1장 (0) | 2022.05.06 |
[Python] 파이썬 코딩의 기술 1장 (0) | 2022.03.06 |
[Programming] 객체지향의 사실과 오해 #7장 (0) | 2020.07.16 |
[Programming] 객체지향의 사실과 오해 #6장 (0) | 2020.07.14 |
댓글