Glueを利用し、S3にアップロードしたCSVファイルをParquet形式に変換する

2024.05.20
Glueを利用し、S3にアップロードしたCSVファイルをParquet形式に変換する
この記事をシェアする

はじめに

こんにちは!スカイアーチHRソリューションズのiizakaです。
先日、AWS Certified Data Engineer – Associate【DEA-C01】認定試験を受けてきました。
試験の中でこんな問題がありました。
“CSV形式のデータをS3データレイクに取り込む必要がある。コスト効率が良いデータセットへのクエリの方法は、Glueを使いParquet形式でデータレイクに取り込むジョブを構成すること”
というような趣旨でした。
Glueを利用したことないけど、Glueで変換できるんか?Parquet形式ってなんだ?
と気になったので実際にやってみました。

整理

やりたいこと

S3バケットへCSV形式のファイルをアップロードする。
自動的にCSV形式のファイルをParquet形式に変換して保存する。

必要なリソース

① CSVファイルをアップロードするS3バケットの作成
② CSVファイルのスキーマを検出し、Parquet形式に変換するGlueジョブの作成
③ Glueジョブを実行させるLambda関数の作成
④ Lambda関数を実行させるS3イベント通知の設定

前提調査

AWS Glueとは

AWS Glueは、完全マネージド型のETL(Extract, Transform, Load)サービス。データの抽出、変換、およびロードのプロセスを簡素化および自動化するためのツールを提供し、データ統合や分析を効率的に行えるようにする。
主なコンポーネントは下記です。

◆Glueクローラー
データストア(S3バケット、RDS、DynamoDBなど)をスキャンしてデータのスキーマを自動的に検出し、Glueデータカタログにメタデータとして登録する。

◆Glueデータカタログ
データのメタデータを管理するための一元化されたリポジトリ。データセットのスキーマ情報やテーブル定義を格納し、ETLジョブやクエリ処理のためのデータの参照を容易にする。

◆Glueジョブ
ETL処理を実行するためのユニット。データの抽出、変換、ロードを自動化するスクリプトを実行する。ジョブは、Glueデータカタログからデータを読み込み、指定されたターゲットにデータを出力する。

◆Glueスタジオ
グラフィカルなユーザインタフェースを提供し、ETLジョブの開発、テスト、およびデプロイを容易にするツール。コードを書くことなく、ビジュアルインターフェースを使用してETLワークフローを設計できる。

Parquet形式とは

Apache Parquetプロジェクトによって提供されるカラム指向のデータストレージ形式。特にビッグデータ処理と分析のために設計されており、高い圧縮率と効率的なクエリ性能を提供する。

リソース作成

S3バケットの作成

CSVファイルをアップロードするS3バケットを作成し、
inputフォルダ(CSVファイルのアップロード先)とoutputフォルダ(Parquetファイルの保存先)を作成します。

AWS Glueジョブの作成

Glueクローラーの作成

GlueクローラーはS3バケット内のCSVファイルのスキーマを自動的に検出し、Glueデータカタログにメタデータを登録します。

クローラーの名前を入力します。
データストアに先ほど作成したS3バケットを選択し、データの場所を指定します。
IAMロールを新しく作成します。S3のデータを読み取るポリシーを付与します。
新しいターゲットデータベースを作成します。

Glueジョブの作成

GlueジョブはGlueデータカタログに登録されたCSVデータを読み込み、必要な変換を行い、Parquet形式でS3に保存します。

Glueスタジオの「Script editor」から作成します。
ジョブの名前を入力し、S3へのアクセスを許可するポリシーを付与したロールを設定します。LanguageにはPythonを選択します。スクリプトに下記コードを記述します。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# データの読み込み
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "csv-database", table_name = "input", transformation_ctx = "datasource0")

# Parquet形式に変換
datasink2 = glueContext.write_dynamic_frame.from_options(frame = datasource0, connection_type = "s3", connection_options = {"path": "s3://upload-bucket-izk/output"}, format = "parquet", transformation_ctx = "datasink2")

job.commit()

Lambda関数の作成

Glueジョブを実行させるLambda関数を作成します。
関数を作成し、下記コードを記述します。

import boto3

def lambda_handler(event, context):
    glue = boto3.client('glue')
    job_name = 'csv-to-parquet-job'
    
    response = glue.start_job_run(JobName=job_name)
    
    return {
        'statusCode': 200,
        'body': f'Glue job {job_name} started successfully.'
    }

S3イベント通知の作成

CSVファイルが該当のバケットにアップロードされた際にLambda関数を実行するイベント通知を作成します。

CSVファイルをアップロードするS3バケットにイベント通知を作成します。
送信先に先ほど作成したLambda関数を指定します。

やってみる

今回の検証で使うCSVファイルは下記です。

名前,年齢,性別,出身
田中太郎,30,男性,東京都
田中花子,25,女性,神奈川県
田中一郎,35,男性,大阪府
田中美咲,40,女性,愛知県
田中次郎,28,男性,北海道
CSVファイルをinput/にアップロードします。
するとoutput/にparquet形式のファイルが保存されました。

ちなみにparquet形式になったファイルをダウンロードして開こうとしましたがテキストエディタでは開けませんでした。
parquet形式はバイナリ形式であり人間が直接読むことはできない形式みたいです。

さいごに

今回の検証ではCSVファイルをGlueを利用してParquet形式に変換できることが確認できました。Glueに対しての理解が少し深まりました。次回はParquet形式のデータをAthenaを使って確認したり、Parquet形式に変換することによりどのくらいクエリ性能が向上するのか確認するところまで踏み込めたらなと思います。

この記事をシェアする
著者:iizaka
テニスや登山、ラーメンが好きなAWSエンジニア。 2023 Japan AWS All Certifications Engineers