μλ νμΈμ ν¬λ¦¬λ₯μ€μλΌμ€μ λλ€.
μ€λμ μ€νν¬μ λ°μ΄ν° νλ μμ μμ±νλ λ²κ³Ό μ€ν€λ§μ λν΄μ μ 리ν΄λ³΄λ €κ³ ν©λλ€.
λ°μ΄ν° νλ μ μμ±
λ¨Όμ λ°μ΄ν° νλ μμ μμ±νλ λ°©λ²μ λλ€. κ°λ¨νκ² νμ¬ EPL λμ μμ μμ 5λͺ μ λν μ λ³΄λ‘ λ§λ€μ΄ λ³΄κ² μ΅λλ€.
λ¨Όμ μ€μΉΌλΌμμ λ°μ΄ν° νλ μμ λ§λ€ λλ Sequence ν΄λμ€λ₯Ό μ¬μ©ν΄ λ€μκ³Ό κ°μ΄ μμ±νλ©΄ λ©λλ€.
μμ±νλ©΄ μ»¬λΌ μ΄λ¦μ΄ μκΈ° λλ¬Έμ λ€μ toDF() λͺ λ Ήμ΄λ‘ 컬λΌλͺ μ μ§μ ν΄μ£Όμλ©΄ 컬λΌλͺ μ΄ μΆκ°λ©λλ€.
# λ°μ΄ν° νλ μ μμ±(컬λΌλͺ
X)
val self_df = spark.createDataFrame(Seq(("μλ§ νλλ", 21), ("ν΄λ¦¬ μΌμΈ", 15),
("μ΄λ° ν λ", 12), ("λ―ΈνΈλ‘λΉμΉ", 11),
("λͺ¨λ λ
Έ", 10)))
# λ°μ΄ν° νλ μ μμ±(컬λΌλͺ
O)
val self_df = spark.createDataFrame(Seq(("μλ§ νλλ", 21), ("ν΄λ¦¬ μΌμΈ", 15),
("μ΄λ° ν λ", 12), ("λ―ΈνΈλ‘λΉμΉ", 11),
("λͺ¨λ λ
Έ", 10))).toDF("μ΄λ¦", "λμ ")
νμ΄μ¬μμλ Sequence ν΄λμ€κ° μλλΌ λ¦¬μ€νΈ([ ])μ λ΄μμ£Όμλ©΄ λ©λλ€.
self_df = spark.createDataFrame([("μλ§ νλλ", 21), ("ν΄λ¦¬ μΌμΈ", 15),
("μ΄λ° ν λ", 12), ("λ―ΈνΈλ‘λΉμΉ", 11),
("λͺ¨λ λ
Έ", 10)]).toDF("μ΄λ¦", "λμ ")
λν, RDDλ₯Ό λ°μ΄ν° νλ μμΌλ‘λ λ³νν μ μμ΅λλ€.
λ¨Όμ μ€μΉΌλΌμ λλ€.
# RDD μμ±
val self_RDD = sc.parallelize(Array("μλ§ νλλ", "ν΄λ¦¬ μΌμΈ", "μ΄λ° ν λ", "λ―ΈνΈλ‘λΉμΉ", "λͺ¨λ λ
Έ"))
# λ°μ΄ν° νλ μ λ³ν
val self_RDD_df = self_RDD.toDF()
μ΄λ¦κ³Ό λμ ννλ‘ λ§λ€λ €λ©΄ λ€μκ³Ό κ°μ΄ νμλ©΄ λ©λλ€.
val self_RDD2 = sc.parallelize(Seq(("μλ§ νλλ", 21), ("ν΄λ¦¬ μΌμΈ", 15),
("μ΄λ° ν λ", 12), ("λ―ΈνΈλ‘λΉμΉ", 11), ("λͺ¨λ λ
Έ", 10)))
val self_RDD2_df = self_RDD2.toDF()
νμ΄μ¬μΌλ‘λ νλ€μ€μ λ°μ΄ν° νλ μμ μ€νν¬μ λ°μ΄ν° νλ μμΌλ‘ λ³νν΄λ³΄λλ‘ νκ² μ΅λλ€.
# νλ€μ€ λΌμ΄λΈλ¬λ¦¬
import pandas as pd
# νλ€μ€ λ°μ΄ν° νλ μ μμ±
data = pd.DataFrame({"κ²½κΈ°λ": ["κΉν¬", "νμ£Ό", "λΆμ²"],
"μμΈμ": ["λ§κ³‘", "κ°μ", "λ§ν¬"],
"κ°μλ": ["μΆμ²", "νμ²", "νμ²"]})
# μ€νν¬ λ°μ΄ν° νλ μμΌλ‘ μ ν
self_data = spark.createDataFrame(data)
μ€ν€λ§(Schema)
μ€νν¬μμλ λ°μ΄ν° νλ μμ λΆλ¬μ€κΈ° μ μ μ€ν€λ§λ₯Ό μ μν΄ μ§μ ν μ μμ΅λλ€. λ°μ΄ν°λ μ»¬λΌ μκ° μ μΌλ©΄ ν¬κ² μλ―Έκ° μμ§λ§, λ°μ΄ν° ν¬κΈ°λ ν¬κ³ 컬λΌλ 20κ° μ΄μμ΄λΌλ©΄ μ€ν€λ§λ₯Ό 미리 μ μν΄ μ£Όλ κ² μ’μ΅λλ€.
μ€ν€λ§λ₯Ό μ μνλ λ°©λ²μ λ κ°μ§κ° μμ΅λλ€. νλλ νλ‘κ·Έλλ° μ€νμΌ λ°©λ², λ€λ₯Έ νλλ DDL(Data Definition Language)μ λλ€.
λ°μ΄ν°λ EPL λμ μμλ₯Ό μ¬μ©νμ΅λλ€.
(λμ λ³λμ΄ μ‘°κΈ μμλ€μ)
첫 λ²μ§Έ λ°©λ²
# μ€μΉΌλΌ
import org.apache.spark.sql.types._
# μ€ν€λ§ μ μ
val eplSchema = StructType(Array(StructField("μ μ", StringType, false),
StructField("ν", StringType, false),
StructField("κ²½κΈ°", IntegerType, false),
StructField("λμ ", IntegerType, false),
StructField("μλ
μμΌ", DateType, false),
StructField("μ 체", StringType, false)))
# λ°μ΄ν° λΆλ¬μ€κΈ°
val epl = spark.read.format("csv")
.schema(eplSchema)
.option("header", true) # headerλ₯Ό trueλ‘ μ€μ ν΄μΌ μ»¬λΌ μ€λ³΅μ νΌν μ μμ
.load("/Users/inamsu/Desktop/epl.csv")
# νμ΄μ¬
from pyspark.sql.types import *
# μ€ν€λ§ μ μ
eplSchema = StructType([StructField("μ μ", StringType(), False),
StructField("ν", StringType(), False),
StructField("κ²½κΈ°", IntegerType(), False),
StructField("λμ ", IntegerType(), False),
StructField("μλ
μμΌ", DateType(), False),
StructField("μ 체", StringType(), False)])
# λ°μ΄ν° λΆλ¬μ€κΈ°
epl = spark.read.format("csv")
.schema(eplSchema)
.option("header", "true)
.load("/Users/inamsu/Desktop/epl.csv")
λ λ²μ§Έ λ°©λ²
# μ€μΉΌλΌ
# μ€ν€λ§ μ μ
val eplSchema2 = "name STRING, team STRING, match INT, goal INT, birth DATE, body STRING"
# λ°μ΄ν° λΆλ¬μ€κΈ°
val epl = spark.read.format("csv")
.schema(eplSchema2)
.option("header", true)
.load("/Users/inamsu/Desktop/epl.csv")
νκΈλ‘ μ€ν€λ§λ₯Ό μ μν κ²½μ°λ Syntax errorκ° λ°μν΄μ μ λ μμ΄λ‘ λ³κ²½νμ΅λλ€.
# νμ΄μ¬
# μ€ν€λ§ μ μ
eplSchema2 = "name STRING, team STRING, match INT, goal INT, birth DATE, body STRING"
# λ°μ΄ν° λΆλ¬μ€κΈ°
epl = spark.read.format("csv")
.schema(eplSchema2)
.option("header", "true")
.load("/Users/inamsu/Desktop/epl.csv")
첫 λ²μ§Έ λ°©λ²μ μ¬μ©ν λ μ λ κΌ μ€μκ° λ°μν΄μ λ λ²μ§Έ λ°©λ²μ΄ μ’ λ μ§κ΄μ μΈ κ² κ°μ΅λλ€.
'μ€νν¬(Spark)' μΉ΄ν κ³ λ¦¬μ λ€λ₯Έ κΈ
μ€νν¬ κ²½νν΄λ³΄κΈ° (0) | 2023.01.07 |
---|