Sparkによるデータ処理

with コメントはまだありません

Sparkを使ったデータ処理を紹介します。Sparkは、分散並列処理の機能を提供するオープンソースライブラリです。数百万、数十億のレコード数といった非整形のデータを高速に処理する事を可能にします。Sparkを利用する際、何十台、何百台、何千台のコンピューターを一つのシステムとして統合し、膨大なデータを分割して、手分けして処理を行います。1台のコンピューターからでも利用が可能です。今回はデモデータを利用しますが、今後、LiDARなどで取得した点群データも処理します。

Sparkの利用

SparkはScala言語で開発されているライブラリです。Javaでも利用が可能です。また、PythonやRなどの言語で利用する機能も存在します。今回利用したパソコンやツールは以下の通りです。

IntelliJ IDEA Ultimate

JetBrains社が提供するIDEです。JavaやScalaなどのプログラムを開発する際に有用なツールです。Ultimateは有料版で年間$150です。Comunity版は無料ですが、Ultimate版の方が強力な機能を利用することができます。この記事ではUltimateを利用しておりますが、Community版でどれほどの機能が提供されているか確認しておりません。

もちろん、IntelliJを利用しなくてもSparkは利用できますが、使うと便利です。

KaggleのNYCタクシーデータ

今回、Sparkで利用したデータは、Kaggleで提供されているNYCタクシーデータです。Kaggleは、機械学習のコンペティションを開催しているウェブサイトです。様々な企業や組織がKaggle上でコンペティションを開催し、より高精度な機械学習モデルの開発を一定期間競います。チームや個人でも参加が可能です。トップの成績を残した参加者は、賞金などを得られます。NYCタクシーデータのコンペティションでは、ニューヨーク市内のタクシーデータを利用して、タクシー代金を推定するモデルを開発します。Googleが開催元です。既に終了しているコンペティションでも、データをダウンロードすることが可能です。データはコチラでダウンロードができます。ダウンロードするには、Kaggleでアカウントを作成する必要があります。

データのレコード数は5,000万以上あります。データ分析には、Pythonがよく使われています。KaggleでもPythonがよく利用されています。Pythonを使う主な利点の1つは、グラフ描画だと思います。データ分析を行う際、グラフを描画し、データの特徴を可視化しながら、解釈する工程が非常に重要です。Pythonは可視化するライブラリが豊富なので、Kaggleでもよく利用されています。しかし、今回のようにデータのレコードが数千万とある場合、Pythonで短時間に処理させることが困難です。

今回は、5,000万ほどの静的なデータを処理しますが、数億単位のデータを毎日行う場合、短時間にデータを処理する必要があります。このような状況で、Sparkは強力なツールとなります。

今回、提供されているデータの内容は以下の通りです。

  • 乗車した日付時間
  • 乗車した緯度経度
  • 降車した日付時間
  • 降車した緯度経度
  • タクシー会社の通し番号
  • 乗車した人数
  • 料金

以上の情報を整形し、機械学習モデルを作りやすくする必要があります。

処理の流れ

今回の処理の流れは以下の通りです。

  1. テキストデータを読み込み、Sparkのデータフレーム型に取り込む
  2. 乗車・降車の緯度経度の情報からK -meansによるクラスタリングを行う
  3. 米国の休日カレンダーから休日のフラグを付与する
  4. 乗車・降車の緯度軽度から、出発地と目的地の距離を計算する
  5. 2から4までの計算値を機械学習モデルの入力値として、GBT(勾配ブースティング回帰)による推定モデルを学習させる
  6. モデル評価

ソースコードは、コチラで公開されています。以下より、各手順の説明をします。

テキストデータの読み込み

Sparkを動かすには、セッションを立ち上げる必要があります。以下のコードのような記述をし、実行するプログラムで継承させることで、Sparkを立ち上げて処理させることができます。


import org.apache.spark.sql.SparkSession

trait SparkSessionBase {
  val sparkSession: SparkSession = org.apache.spark.sql.SparkSession.builder
    .master("local[*]")
    .appName("KaggleNYCTaxi")
    .getOrCreate()
  sparkSession.sparkContext.setLogLevel("ERROR")

次に一連の処理を行うプログラムファイルに以下のような記述を行います。

import SimplePrediction.Cluster.{predictCluster, trainKMeans}
import SimplePrediction.Distance.makeHaversineDsirance
import SimplePrediction.Prediction.trainFarePredictionModel
import SimplePrediction.preProcess.{filterOutliers, makeHolidays, processTime}
import org.apache.spark.sql.functions.broadcast
import Util.SparkSessionBase
import Util.ReadData.{csvToDataFrame, getHolidayData}

object Main extends SparkSessionBase {
  def main(args: Array[String]): Unit = {

    val trainDataLink = "data/train.csv"
    val holidayDataLink = "data/usholidays.csv"
    val kmeansModelLink = "data/kmeans.model"
    val sample_fraction = 0.2

    var trainData = csvToDataFrame(sparkSession, trainDataLink)

...
  }
}

extends SparkSessionBase を継承することで、実行プログラム内でセッションを立ち上げさせることが可能になります。そして、sparkSessionを引数としてcsvToDataFrameに渡すことで、テキストデータを読み込むことができます。

def csvToDataFrame(sparkSession: SparkSession, link: String): DataFrame = {
    val customSchema = StructType(Array(
      StructField("key", StringType, true),
      StructField("fare_amount", DoubleType, true),
      StructField("pickup_datetime", StringType, true),
      StructField("pickup_longitude", DoubleType, true),
      StructField("pickup_latitude", DoubleType, true),
      StructField("dropoff_longitude", DoubleType, true),
      StructField("dropoff_latitude", DoubleType, true),
      StructField("passenger_count", IntegerType, true)
    ))

    val dfRaw = sparkSession.read
      .format("csv")
      .option("header", "true")
      .option("mode", "DROPMALFORMED")
      .schema(customSchema)
      .load(link)
      .withColumn("pickup_datetime", unix_timestamp(col("pickup_datetime"), "yyyy-MM-dd HH:mm:ss zzz")
        .cast(TimestampType))

    val df = dfRaw
      .withColumn("pickup_point", st_makePoint(col("pickup_longitude"), col("pickup_latitude")))
      .withColumn("dropoff_point", st_makePoint(col("dropoff_longitude"), col("dropoff_latitude")))
    df
  }

上記の関数では、指定したデータのCSVファイルを読み込む手順を行なっています。各列のデータ型を定義し、最終的にデータフレームとして返しています。データフレームは、Sparkが提供するデータ型で、ビックデータを処理するための機能を利用することができます。

大まかな処理と結果

データの処理の詳細は、Githubのリポジトリでソースコードが上記のリンクより公開されています。実行プログラムのval sample_fraction という変数があります。これは、トレーニングデータセットの何割を学習に使用するか、というパラメータです。この数値が低いと少ないデータサンプルで処理を行うので、短い時間で処理することができ、高いスペックのコンピュータも必要としません。

プログラムを実行すると一連の処理が始まり、最後にモデルの精度が表示されます。結果は端末で以下のように表示されます。私の環境では0.9(9割のデータを利用する)で行いました。処理が終わるまで10分ほどの時間を要しました。Pythonで処理する場合、さらに長い時間(おそらく数日)がかかると思います。

今回のコンペティションの評価指標は、平均二乗誤差(RMSE)です。RMSEが低いほど良いということになります。コンペで1位となった人は1.385という精度でした。

より、高精度な推定モデルを開発する場合、モデルの改良を行う必要があると思います。

まとめ

Sparkによる大量データの処理について紹介しました。今回は、Kaggleのコンペティションのニューヨーク州のタクシーデータを利用しました。データのレコード数は5,000万以上となり、非常に大きなデータセットです。Sparkを利用することで、短時間にデータを処理することができました。今後、Sparkを使って、点群データなどの処理を行なっていきます。

Leave a Reply

このサイトはスパムを低減するために Akismet を使っています。コメントデータの処理方法の詳細はこちらをご覧ください