クラウド

EMR(Pyspark)でavro形式のファイルを読み書きしAthenaで参照する

PySpark周りは一度見つけたものはある程度まとめておかないと掘り起こすのが大変です。
今回はその一つである、avro形式の読み書きについてまとめておこうと思います。

EMR(Pyspark)でavro形式のファイルを読み書きしAthenaで参照する

EMR

【EMR(Pyspark)】バージョン

EMR 6.0.0
Spark 2.4.4
Python 3.7.4

ちょっと歴史

sparkでavroを読み書きするのは通称spark-avroと呼ばれるライブラリを利用する必要があります。
こちらは2.4.4では組み込みされているのですが、最初はdatabrics社によって開発され後にApache Spark移管されました。

【EMR(Pyspark)】利用の仕方

起動時に

<pre class="wp-block-code"><code>pyspark --packages org.apache.spark:spark-avro_2.12:2.4.4
もしくは
spark-submit hoge.py pyspark --packages org.apache.spark:spark-avro_2.12:2.4.4</code></pre>
pyspark --packages org.apache.spark:spark-avro_2.12:2.4.4
もしくは
spark-submit hoge.py pyspark --packages org.apache.spark:spark-avro_2.12:2.4.4

とPysparkのバージョンにあったライブラリを指定して起動する必要があります。

#適当にデータフレームを作成
df=spark.sql("select 1 as a,2 as b")
#外部パッケージだからなのか、orcやparquetなどとは書き込みの方法が異なる
df.write.mode("overwrite").format("avro").partitionBy("a").save("s3://hoge.bucket/user/hive/warehouse/adhoc.db/hogepekmaru/")
df = spark.read.format("avro").load("s3://hoge.bucket/user/hive/warehouse/adhoc.db/hogepekmaru/")


【EMR(Pyspark)】Hiveから参照してみる

まとまってるサイトないかなと、他力本願丸出しでググってきて見つけたサイト。
Hiveの場合は、テーブル定義をavro.schema.urlとurl 形式で外部スキーマを取り込むことができます。 How to Work With Avro Files - DZone Big Data A big data architect provides a tutorial on working with Avro dzone.com

' TBLPROPERTIES ('avro.schema.url'='hdfs:///user/hive/warehouse/avsc/employee.avsc')

こうすることで、エンジニアの好きな設定ファイルの外だしを行うことができる&外部からも一貫したスキーマを参照できるのです。

【EMR(Pyspark)】AWS Athenaから参照する

Hiveは便利に、上記のように外部にスキーマレジストリを置くことができるのですが、Athenaはそうもいきません。
その指定ができないので、原則Athenaを利用する場合は、avro.schema.literalをテーブルにプロパティにつけ、avsc内のjsonを直書きする必要があります。このliteral指定の方式は、hiveでも有効で、相互運用をしているところはコトラでの指定が必須になります(そうでないと別テーブルを作る必要がある)。
Amazon Athena - User Guide docs.amazonaws.cn


上記より抜粋

CREATE EXTERNAL TABLE flights_avro_example (
yr INT,
flightdate STRING,
uniquecarrier STRING,
airlineid INT,
carrier STRING,
flightnum STRING,
origin STRING,
dest STRING,
depdelay INT,
carrierdelay INT,
weatherdelay INT
)
PARTITIONED BY (year STRING)
ROW FORMAT
SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
WITH SERDEPROPERTIES ('avro.schema.literal'='
{
"type" : "record",
"name" : "flights_avro_subset",
"namespace" : "default",
"fields" : [ {
.....

Avroというリッチなスキーマが表現できる一方で、このようなベタ書きは相当きついです。


Glue(のり)

こうなってくると、もう運用が大変なわけです。avro-toolでスキーマ情報を抜き出して、avscを作成してテーブルにはっつけて、、
avroは柔軟ですが、運用が柔軟ではなくなってしまうのです。

ここで登場するのがAWS Glue Crawlerです。
Glueクローラーを動かすことによって、面倒だったliteralの部分を勝手にやってくれます!

日々の取り込みで、このクローラーを動かすことによって、変更されるスキーマにも対応できるしありがたいツールの一つです。
というかこれなしに、運用へ導入はできません。

Glue Crawlerを使う時はhove.avroというファイルがあれば問題ないです。
スキーマファイルは.avroに格納されているので、その中のデータをデシリアライズしてカラム情報を引っ張ってくることができるから(avscのように外にだして、それを参考にデータをデシリアライズすることもできます。)。

おまけ:Avroというもの

最初はとっつきにくいのですが、ストリーミングではこのavro(対抗としてprotocol buffer)での運用はマストです。
jsonだと速度が出ない、parquetだとリッチな型はできない。kafkaなどのストリーミングを使う時は、これ一択でというくらい便利です。

おまけ:Avro便利なんやで

avroは基本的に開発速度の違いを吸収するために作られました。
データ基盤の世界では開発元と依頼先の速度が必ずしも一致するわけではなく、一方が遅れて登場するということがしばしばです。
カラムの追加はや変更は日常茶飯事でかつ変更者が大量にいる一方、それを取り込む側の人数は少なく対応速度が合致しない場合などなど、パターンは様々です。

そのため、avroには後方(前方)互換と呼ばれる、機能があり片方が変わっても片方が影響を受けないという仕組みを利用することができます。

kafka + avro はその走りで、それ専用のスキーマレジストリを用意してくれている会社があるくらい。

是非jsonなどのrawフォーマットを脱却し、Avroに乗り換えて見てください!

スポンサーリンク

スポンサーリンク

PVアクセスランキング にほんブログ村
  • この記事を書いた人

ヒゲ

とある会社でエンジニアをやりつつ、家族のためにせっせと投資に励むオジさん。ネット上ではヒゲと呼ばれることが多い。是非フォローして下さい。

-クラウド

© 2021 家族経営ブログ