λ³Έλ¬Έ λ°”λ‘œκ°€κΈ°
슀파크(Spark)

슀파크 κ²½ν—˜ν•΄λ³΄κΈ°

by 크리λ„₯μŠ€μ„λΌμ΄μŠ€ 2023. 1. 7.

μ•ˆλ…•ν•˜μ„Έμš” 크리λ„₯μŠ€μ„λΌμŠ€μž…λ‹ˆλ‹€.

 

슀파크λ₯Ό κ³΅λΆ€ν•˜κΈ° μ‹œμž‘ν•˜λ©΄μ„œ κ³΅λΆ€ν•œ λ‚΄μš©λ“€μ„ μ‘°κΈˆμ”© 정리해 λ³Όλ €κ³  ν•©λ‹ˆλ‹€.

 

μ €λŠ” λ§₯뢁 m1 에어λ₯Ό μ‚¬μš©ν•˜κ³  있으며, 슀파크 μ„€μΉ˜ 버전은 3.3.1 μž…λ‹ˆλ‹€.

λ§₯뢁 m1 에어

파이썬

μ €λŠ” ν„°λ―Έλ„μ—μ„œ pyspark ν˜Ήμ€ Spark의 bin ν΄λ”μ—μ„œ pyspark λͺ…λ Ήμ–΄λ‘œ λ‹€μŒκ³Ό 같은 화면을 μ‹€ν–‰μ‹œμΌ°μŠ΅λ‹ˆλ‹€.

이 ν™”λ©΄μ—μ„œ μ €ν¬λŠ” 슀파크λ₯Ό μ΄μš©ν•΄ 데이터 뢄석을 μˆ˜ν–‰ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

 

저도 λ°°μ›Œκ°€λŠ” μž…μž₯μ΄λ‹ˆ κ°„λ‹¨ν•˜κ²Œ 2022 μ‹œμ¦Œ K리그 득점 μˆœμœ„ 기둝을 가지고 데이터λ₯Ό 닀뀄보도둝 ν•˜κ² μŠ΅λ‹ˆλ‹€.

득점 μˆœμœ„

μ €λŠ” κ·Έλƒ₯ μ›ΉνŽ˜μ΄μ§€μ˜ 글을 λ³΅μ‚¬ν•΄μ„œ ν•˜λ‚˜μ˜ csv 파일둜 λ§Œλ“€μ—ˆμŠ΅λ‹ˆλ‹€.

csv 파일

데이터가 μ€€λΉ„λ˜μ—ˆλ‹€λ©΄ ν•œ 번 μŠ€νŒŒν¬μ—μ„œ 데이터λ₯Ό λΆˆλŸ¬μ˜€λ„λ‘ ν•˜κ² μŠ΅λ‹ˆλ‹€.

 

μŠ€νŒŒν¬μ—μ„œ 데이터λ₯Ό 뢈러올 λ•ŒλŠ” λ‹€μŒκ³Ό 같은 λͺ…령문을 μ‚¬μš©ν•˜λŠ”λ° μžμ„Έν•œ κ±°λŠ” λ’€μ—μ„œ μžμ„Ένžˆ 닀뀄보도둝 ν•˜κ² μŠ΅λ‹ˆλ‹€.

 

data = spark.read.format()  # 뢈러올 λ°μ΄ν„°μ˜ ν˜•μ‹  
                 .option("header", "true") # 파일 λ‚΄ 컬럼 이름 μ‚¬μš©
                 .option("inferSchema", "true") # μŠ€ν‚€λ§ˆλ₯Ό μΆ”λ‘ 
                 .load()  # 뢈러올 파일 경둜
data.show()  # 뢈러온 데이터λ₯Ό 좜λ ₯ν•˜λŠ” λͺ…λ Ήμ–΄. κ΄„ν˜Έ μ•ˆμ— 좜λ ₯ν•˜κ³  싢은 개수λ₯Ό λ„£μœΌλ©΄ λœλ‹€.

 

μ‹€ν–‰ κ²°κ³Ό

μ €λŠ” λ‹€ν–‰νžˆ 데이터가 μ œλŒ€λ‘œ 좜λ ₯이 λ˜μ—ˆμŠ΅λ‹ˆλ‹€.

 

컬럼이 λ§Žμ•„μ„œ 보기 νž˜λ“œλ‹ˆ μ›ν•˜λŠ” 컬러만 κ³¨λΌμ„œ 좜λ ₯ν•΄λ³΄κ² μŠ΅λ‹ˆλ‹€.

 

data.select()  # κ΄„ν˜Έ μ•ˆμ— μ„ νƒν•œ 컬럼λͺ…을 적으면 λœλ‹€.

 

μ—¬κΈ°μ„œ μ£Όμ˜ν•  점은 select λͺ…λ Ήμ–΄λ§Œ 적고 μ—”ν„°λ₯Ό λˆ„λ₯΄λ©΄ μ›ν•˜λŠ” ν˜•νƒœλ‘œ 좜λ ₯은 λ˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€. 

select만 μ“Έ λ•Œ

κ·Έ μ΄μœ λŠ” μŠ€νŒŒν¬κ°€ μž‘λ™ν•˜λŠ” 원리에 μžˆμŠ΅λ‹ˆλ‹€. 잠깐 μ„€λͺ…ν•˜λ©΄ 슀파크의 연산은 νŠΈλžœμŠ€ν¬λ©”μ΄μ…˜κ³Ό μ•‘μ…˜μœΌλ‘œ κ΅¬λΆ„λ˜λ©°,

 

νŠΈλžœμŠ€ν¬λ©”μ΄μ…˜ 연산은 μ¦‰μ‹œ κ²°κ³Όκ°€ κ³„μ‚°λ˜λŠ” 것이 μ•„λ‹ˆλΌ 계보(lineage)라 λΆˆλ¦¬λŠ” ν˜•νƒœλ‘œ κΈ°λ‘λ˜λ‹€κ°€ μ•‘μ…˜ 연산을 λ§Œλ‚˜λ©΄ 늦게 싀행이 λ©λ‹ˆλ‹€.

  • νŠΈλžœμŠ€ν¬λ§€μ΄μ…˜ : orderBy(), groupBy(), filter(), select(), join()
  • μ•‘μ…˜ : show(), take(), count(), collect(), save()

μŠ€νŒŒν¬κ°€ 이런 연산을 ν•˜λŠ” μ΄μœ λŠ” μž₯μ• κ°€ λ°œμƒν–ˆμ„ λ•Œ 기둝된 계보λ₯Ό 따라가면 μ‰½κ²Œ 볡ꡬ가 κ°€λŠ₯ν•˜λ‹€λŠ” 점과 μ‹€ν–‰μ˜ ν›„λ°˜λΆ€μ— νŠΈλžœμŠ€ν¬λ©”μ΄μ…˜μ„ 더 효율적으둜 μ‹€ν–‰ν•  수 μžˆλ‹€λŠ” μ μž…λ‹ˆλ‹€.

 

μ΄μ–΄μ„œ μ €λŠ” νŒ€λ³„ 득점 정보λ₯Ό 좜λ ₯ν–ˆμŠ΅λ‹ˆλ‹€.

 

μ „λΆμ˜ μ‘°κ·œμ„± μ„ μˆ˜κ°€ 득점 μˆœμœ„ 1μœ„μ§€λ§Œ 30λͺ…μ˜ μ„ μˆ˜λ“€ μ€‘μ—μ„œλŠ” μšΈμ‚°μ΄ κ°€μž₯ λ§Žμ€ 득점을 κΈ°λ‘ν–ˆμŠ΅λ‹ˆλ‹€.

 

data.groupyBy()  # κ·Έλ£Ήν™”ν•  컬럼 이름 μ„€μ •
    .sum()       # 연산을 진행할 컬럼 μ„€μ •
    .orderBy()   # μ •λ ¬ν•  컬럼 μ„€μ •, ascending = True둜 ν•˜λ©΄ μ˜€λ¦„μ°¨μˆœμœΌλ‘œ μ •λ ¬λœλ‹€.
    .show()

 

 

쑰금 더 μžμ„Ένžˆ 보기 μœ„ν•΄ 득점 외에 도움과 μŠˆνŒ…, μ˜€ν”„μ‚¬μ΄λ“œ μ»¬λŸΌλ„ μΆ”κ°€ν–ˆμŠ΅λ‹ˆλ‹€. 정렬은 득점 κΈ°μ€€μž…λ‹ˆλ‹€.

 

μ΄λ²ˆμ—λŠ” μŠˆνŒ… νšŸμˆ˜κ°€ 70번 이상인 μ„ μˆ˜λ“€λ§Œ 좜λ ₯ν•˜λ„λ‘ 쑰건을 λ„£μ—ˆμŠ΅λ‹ˆλ‹€ .

 

data.where()  # λ³€μˆ˜λͺ….컬럼λͺ…을 μ΄μš©ν•΄ 쑰건을 μ„€μ •
    .show()

 

쑰건을 ν•˜λ‚˜ 더 μΆ”κ°€ν•΄ μŠˆνŒ…μ„ 70번 μ΄μƒν–ˆμ§€λ§Œ 득점은 15번 μ΄ν•˜μΈ μ„ μˆ˜λ“€μ„ 좜λ ₯ν–ˆμŠ΅λ‹ˆλ‹€.

 

μ§€κΈˆκΉŒμ§€λŠ” 터미널 μ°½μ—μ„œ μ½”λ“œλ₯Ό ν•˜λ‚˜ν•˜λ‚˜ μˆ˜ν–‰ν–ˆλ‹€λ©΄ μ΄λ²ˆμ—λŠ” 파이썬 파일둜 같은 κ²°κ³Όλ₯Ό 좜λ ₯ν•΄λ³΄λŠ” λ°©λ²•μž…λ‹ˆλ‹€.

 

λ¨Όμ € λ‹€μŒκ³Ό 같은 μ½”λ“œλ₯Ό k-league.py 파일둜 μƒμ„±ν•©λ‹ˆλ‹€.

 

# ν•„μš”ν•œ 라이브러리
import sys
from pyspark.sql import SparkSession

# μ½”λ“œ μ‹œμž‘
if __name__ == "__main__":
    # spark-submit을 ν•  λ•Œ μΆ”κ°€μ μœΌλ‘œ 파일 경둜λ₯Ό μž…λ ₯ μ•ˆ ν•  λ•Œ 좜λ ₯λ˜λŠ” κ²½κ³ λ¬Έ
    if len(sys.argv) != 2:
        print("λ‹€μ‹œ μ‹œλ„ν•΄ μ£Όμ„Έμš”.")
        sys.exit(-1)
    
    # SparkSession 객체λ₯Ό 생성 > pyspark μ°½μ΄λ‚˜ spark-shellμ—μ„œλŠ” ν•„μš”X
    spark = (SparkSession
             .builder
             .appName("k-league")
             .getOrCreate())
    
    # 터미널 μ°½μ—μ„œ μž…λ ₯ 받은 파일 경둜
    file_path = sys.argv[1]
    
    # 데이터 뢈러였기
    data = (spark.read.format("csv")
                .option("header", "true")
                .option("inferSchema", "true")
                .load(file_path))
    
    data.show(10)
    
    data2 = data.select("μˆœμœ„", "μ„ μˆ˜λͺ…", "νŒ€", "득점", "μŠˆνŒ…", "좜μž₯", "κ²½κΈ°λ‹Ή 기둝")
    data2.show(10)
    
    data3 = data.groupBy("νŒ€").sum("득점").orderBy("sum(득점)", ascending = False)    
    data3.show()
    
    
    data4 = data.groupBy("νŒ€").sum("득점", "도움", "μŠˆνŒ…", "μ˜€ν”„μ‚¬μ΄λ“œ").orderBy("sum(득점)", ascending = False)    
    data4.show()
    
    
    data5 = data.where(data.μŠˆνŒ… >= 70)
    data5.show()

    data6 = data.where(data.μŠˆνŒ… >= 70).where(data.득점 < 15)
    data6.show()
    
    # SparkSession μ’…λ£Œ
    spark.stop()

 

Spark의 bin ν΄λ”μ—μ„œ spark-submit 을 μ΄μš©ν•΄ 파이썬 νŒŒμΌμ„ μ‹€ν–‰μ‹œν‚΅λ‹ˆλ‹€.

(μ €λŠ” 미리 ν™˜κ²½λ³€μˆ˜μ— $SPARK_HOME을 μ§€μ •ν•΄μ€˜μ„œ $SPARK_HOME/bin/spark-submit을 μ‚¬μš©ν•  수 μžˆμŠ΅λ‹ˆλ‹€.)

spark경둜/bin/spark-submit 파이썬 파일 경둜

 

λ§Œμ•½ 파이썬 파일 경둜 뒀에 csv 파일 경둜λ₯Ό 넣지 μ•ŠμœΌλ©΄ 파이썬 νŒŒμΌμ—μ„œ μ„€μ •ν•œ λŒ€λ‘œ λ‹€μŒκ³Ό 같이 좜λ ₯λ©λ‹ˆλ‹€.

csv 파일 κ²½λ‘œκΉŒμ§€ λ„£μ–΄μ„œ μ‹€ν–‰μ‹œν‚€λ©΄ λ‹€μŒκ³Ό 같이 μ°¨λ‘€λŒ€λ‘œ 좜λ ₯λ©λ‹ˆλ‹€.

 

 

좜λ ₯λ˜λŠ” 와쀑에 μ €μ²˜λŸΌ κΉ”λ”ν•˜κ²Œ 좜λ ₯λ˜μ§€ μ•Šκ³  INFO λ‘œκ·Έλ“€μ΄ μ—„μ²­ 많이 좜λ ₯λ˜μ‹œλŠ” 뢄듀이 μžˆλ‹€λ©΄

 

이 λΈ”λ‘œκ·Έλ₯Ό μ°Έμ‘°ν•˜μ…”μ„œ INFO λ‘œκ·Έκ°€ 좜λ ₯λ˜μ§€ μ•Šλ„λ‘ μ„€μ •ν•΄ μ£Όμ‹œλ©΄ λ©λ‹ˆλ‹€.

 

λ˜ν•œ, μ œλŒ€λ‘œ ν•œ κ±° 같은데 싀행이 μ•ˆ λ˜μ‹ λ‹€λ©΄ μ €μ²˜λŸΌ pyspark 라이브러리λ₯Ό μ„€μΉ˜ν•˜μ§€ μ•ŠμœΌμ‹ κ±΄μ§€

 

ν™•μΈν•΄λ³΄μ‹œλ©΄ 될 것 κ°™μŠ΅λ‹ˆλ‹€.

 

μ—¬κΈ°κΉŒμ§€λŠ” νŒŒμ΄μ¬μ„ μ΄μš©ν•΄ 슀파크λ₯Ό μ‚¬μš©ν•΄λ³΄λŠ” λ°©λ²•μ΄μ—ˆμŠ΅λ‹ˆλ‹€.

 

슀칼라λ₯Ό μ‚¬μš©ν•˜κ³  μ‹ΆμœΌμ‹  뢄듀은 pysparkλŒ€μ‹  spark-shell을 터미널에 μž…λ ₯ν•˜μ‹œλ©΄ λ©λ‹ˆλ‹€.

 

슀칼라λ₯Ό μ΄μš©ν•  λ•Œ 파이썬과 λ‹€λ₯Έ κ°€μž₯ 큰 뢀뢄은 λ³€μˆ˜λ₯Ό 생성할 λ•Œ val λ³€μˆ˜λͺ…이 λœλ‹€λŠ” κ²ƒμž…λ‹ˆλ‹€. 

슀칼라λ₯Ό μ΄μš©ν•œ 뢀뢄은 쑰금 더 κ³΅λΆ€ν•΄μ„œ λ‹€μŒ κΈ€λΆ€ν„°λŠ” 같이 λ‹€λ£° 수 μžˆλ„λ‘ ν•˜κ² μŠ΅λ‹ˆλ‹€.

 

κ°μ‚¬ν•©λ‹ˆλ‹€.