λ³Έλ¬Έ λ°”λ‘œκ°€κΈ°
슀파크(Spark)

슀파크 데이터 ν”„λ ˆμž„(생성, μŠ€ν‚€λ§ˆ)

by 크리λ„₯μŠ€μ„λΌμ΄μŠ€ 2023. 1. 14.

μ•ˆλ…•ν•˜μ„Έμš” 크리λ„₯μŠ€μ„λΌμŠ€μž…λ‹ˆλ‹€.

 

μ˜€λŠ˜μ€ 슀파크의 데이터 ν”„λ ˆμž„μ„ μƒμ„±ν•˜λŠ” 법과 μŠ€ν‚€λ§ˆμ— λŒ€ν•΄μ„œ 정리해보렀고 ν•©λ‹ˆλ‹€.

 

데이터 ν”„λ ˆμž„ 생성

λ¨Όμ € 데이터 ν”„λ ˆμž„μ„ μƒμ„±ν•˜λŠ” λ°©λ²•μž…λ‹ˆλ‹€. κ°„λ‹¨ν•˜κ²Œ ν˜„μž¬ EPL 득점 μˆœμœ„ μƒμœ„ 5λͺ…에 λŒ€ν•œ μ •λ³΄λ‘œ λ§Œλ“€μ–΄ λ³΄κ² μŠ΅λ‹ˆλ‹€.

λ¨Όμ € μŠ€μΉΌλΌμ—μ„œ 데이터 ν”„λ ˆμž„μ„ λ§Œλ“€ λ•ŒλŠ” Sequence 클래슀λ₯Ό μ‚¬μš©ν•΄ λ‹€μŒκ³Ό 같이 μž‘μ„±ν•˜λ©΄ λ©λ‹ˆλ‹€.

 

μƒμ„±ν•˜λ©΄ 컬럼 이름이 μ—†κΈ° λ•Œλ¬Έμ— 뒀에 toDF() λͺ…λ Ήμ–΄λ‘œ 컬럼λͺ…을 μ§€μ •ν•΄μ£Όμ‹œλ©΄ 컬럼λͺ…이 μΆ”κ°€λ©λ‹ˆλ‹€.

# 데이터 ν”„λ ˆμž„ 생성(컬럼λͺ… X)
val self_df = spark.createDataFrame(Seq(("μ—˜λ§ ν™€λž€λ“œ", 21), ("해리 케인", 15), 
				        ("이반 ν† λ‹ˆ", 12), ("λ―ΈνŠΈλ‘œλΉ„μΉ˜", 11), 
                                        ("λͺ¨λ ˆλ…Έ", 10)))
# 데이터 ν”„λ ˆμž„ 생성(컬럼λͺ… O)                                        
val self_df = spark.createDataFrame(Seq(("μ—˜λ§ ν™€λž€λ“œ", 21), ("해리 케인", 15), 
					("이반 ν† λ‹ˆ", 12), ("λ―ΈνŠΈλ‘œλΉ„μΉ˜", 11), 
                                        ("λͺ¨λ ˆλ…Έ", 10))).toDF("이름", "득점")

컬럼λͺ… X
컬럼λͺ… O

νŒŒμ΄μ¬μ—μ„œλŠ” Sequence ν΄λž˜μŠ€κ°€ μ•„λ‹ˆλΌ 리슀트([ ])에 λ‹΄μ•„μ£Όμ‹œλ©΄ λ©λ‹ˆλ‹€.

self_df = spark.createDataFrame([("μ—˜λ§ ν™€λž€λ“œ", 21), ("해리 케인", 15), 
			         ("이반 ν† λ‹ˆ", 12), ("λ―ΈνŠΈλ‘œλΉ„μΉ˜", 11), 
                                 ("λͺ¨λ ˆλ…Έ", 10)]).toDF("이름", "득점")

파이썬

 

λ˜ν•œ, RDDλ₯Ό 데이터 ν”„λ ˆμž„μœΌλ‘œλ„ λ³€ν™˜ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

 

λ¨Όμ € μŠ€μΉΌλΌμž…λ‹ˆλ‹€.

# RDD 생성
val self_RDD = sc.parallelize(Array("μ—˜λ§ ν™€λž€λ“œ", "해리 케인", "이반 ν† λ‹ˆ", "λ―ΈνŠΈλ‘œλΉ„μΉ˜", "λͺ¨λ ˆλ…Έ"))

# 데이터 ν”„λ ˆμž„ λ³€ν™˜
val self_RDD_df = self_RDD.toDF()

RDD -> 데이터 ν”„λ ˆμž„
컬럼λͺ… 지정

이름과 득점 ν˜•νƒœλ‘œ λ§Œλ“€λ €λ©΄ λ‹€μŒκ³Ό 같이 ν•˜μ‹œλ©΄ λ©λ‹ˆλ‹€.

val self_RDD2 = sc.parallelize(Seq(("μ—˜λ§ ν™€λž€λ“œ", 21), ("해리 케인", 15), 
				   ("이반 ν† λ‹ˆ", 12), ("λ―ΈνŠΈλ‘œλΉ„μΉ˜", 11), ("λͺ¨λ ˆλ…Έ", 10)))

val self_RDD2_df = self_RDD2.toDF()

νŒŒμ΄μ¬μœΌλ‘œλŠ” νŒλ‹€μŠ€μ˜ 데이터 ν”„λ ˆμž„μ„ 슀파크의 데이터 ν”„λ ˆμž„μœΌλ‘œ λ³€ν™˜ν•΄λ³΄λ„λ‘ ν•˜κ² μŠ΅λ‹ˆλ‹€.

# νŒλ‹€μŠ€ 라이브러리
import pandas as pd

# νŒλ‹€μŠ€ 데이터 ν”„λ ˆμž„ 생성
data = pd.DataFrame({"경기도": ["김포", "파주", "λΆ€μ²œ"], 
		     "μ„œμšΈμ‹œ": ["마곑", "κ°€μ–‘", "마포"], 
         	     "강원도": ["좘천", "ν™μ²œ", "ν™”μ²œ"]})

# 슀파크 데이터 ν”„λ ˆμž„μœΌλ‘œ μ „ν™˜
self_data = spark.createDataFrame(data)

 

μŠ€ν‚€λ§ˆ(Schema)

μŠ€νŒŒν¬μ—μ„œλŠ” 데이터 ν”„λ ˆμž„μ„ 뢈러였기 전에 μŠ€ν‚€λ§ˆλ₯Ό μ •μ˜ν•΄ 지정할 수 μžˆμŠ΅λ‹ˆλ‹€. λ°μ΄ν„°λ‚˜ 컬럼 μˆ˜κ°€ 적으면 크게 μ˜λ―Έκ°€ μ—†μ§€λ§Œ, 데이터 크기도 크고 μ»¬λŸΌλ„ 20개 이상이라면 μŠ€ν‚€λ§ˆλ₯Ό 미리 μ •μ˜ν•΄ μ£ΌλŠ” 게 μ’‹μŠ΅λ‹ˆλ‹€.

 

μŠ€ν‚€λ§ˆλ₯Ό μ •μ˜ν•˜λŠ” 방법은 두 가지가 μžˆμŠ΅λ‹ˆλ‹€. ν•˜λ‚˜λŠ” ν”„λ‘œκ·Έλž˜λ° μŠ€νƒ€μΌ 방법, λ‹€λ₯Έ ν•˜λ‚˜λŠ” DDL(Data Definition Language)μž…λ‹ˆλ‹€.

 

λ°μ΄ν„°λŠ” EPL 득점 μˆœμœ„λ₯Ό μ‚¬μš©ν–ˆμŠ΅λ‹ˆλ‹€.

(득점 변동이 쑰금 μžˆμ—ˆλ„€μš”)

 

첫 번째 방법

# 슀칼라
import org.apache.spark.sql.types._

# μŠ€ν‚€λ§ˆ μ •μ˜
val eplSchema = StructType(Array(StructField("μ„ μˆ˜", StringType, false), 
				  StructField("νŒ€", StringType, false), 
				  StructField("κ²½κΈ°", IntegerType, false), 
				  StructField("득점", IntegerType, false), 
				  StructField("생년월일", DateType, false), 
				  StructField("신체", StringType, false)))

# 데이터 뢈러였기
val epl = spark.read.format("csv")
		    .schema(eplSchema)
                    .option("header", true) # headerλ₯Ό true둜 μ„€μ •ν•΄μ•Ό 컬럼 쀑볡을 ν”Όν•  수 있음
                    .load("/Users/inamsu/Desktop/epl.csv")

header μ˜΅μ…˜μ΄ 없을 λ•Œ
header μ˜΅μ…˜μ΄ μžˆμ„ λ•Œ

# 파이썬
from pyspark.sql.types import *

# μŠ€ν‚€λ§ˆ μ •μ˜
eplSchema = StructType([StructField("μ„ μˆ˜", StringType(), False), 
			StructField("νŒ€", StringType(), False), 
			StructField("κ²½κΈ°", IntegerType(), False), 
			StructField("득점", IntegerType(), False), 
			StructField("생년월일", DateType(), False), 
			StructField("신체", StringType(), False)])

# 데이터 뢈러였기
epl = spark.read.format("csv")
		.schema(eplSchema)
        .option("header", "true)
		.load("/Users/inamsu/Desktop/epl.csv")

 

두 번째 방법

# 슀칼라
# μŠ€ν‚€λ§ˆ μ •μ˜
val eplSchema2 = "name STRING, team STRING, match INT, goal INT, birth DATE, body STRING"

# 데이터 뢈러였기
val epl = spark.read.format("csv")
		    .schema(eplSchema2)
		    .option("header", true)
		    .load("/Users/inamsu/Desktop/epl.csv")

ν•œκΈ€λ‘œ μŠ€ν‚€λ§ˆλ₯Ό μ •μ˜ν•œ κ²½μš°λŠ” Syntax errorκ°€ λ°œμƒν•΄μ„œ μ €λŠ” μ˜μ–΄λ‘œ λ³€κ²½ν–ˆμŠ΅λ‹ˆλ‹€.

Syntax error

# 파이썬
# μŠ€ν‚€λ§ˆ μ •μ˜
eplSchema2 = "name STRING, team STRING, match INT, goal INT, birth DATE, body STRING"

# 데이터 뢈러였기
epl = spark.read.format("csv")
		.schema(eplSchema2)
		.option("header", "true")
		.load("/Users/inamsu/Desktop/epl.csv")

 

첫 번째 방법을 μ‚¬μš©ν•  λ•Œ μ €λŠ” κΌ­ μ‹€μˆ˜κ°€ λ°œμƒν•΄μ„œ 두 번째 방법이 μ’€ 더 직관적인 것 κ°™μŠ΅λ‹ˆλ‹€.

'슀파크(Spark)' μΉ΄ν…Œκ³ λ¦¬μ˜ λ‹€λ₯Έ κΈ€

슀파크 κ²½ν—˜ν•΄λ³΄κΈ°  (0) 2023.01.07