Glueのdynamicframeの内容をCloudwatch logsに載せる(前編)

Glueのdynamicframeの内容をCloudwatch logsに載せる(前編)
この記事をシェアする

はじめに

こんにちは。スカイアーチHRソリューションズのmikaです。
今回は(自分の失敗経験談を絡めて)

  1. GlueジョブでData Catalogにデータを書き込みしたが、一部データが挿入されていなかった。
  2. 元のCSVデータから読み出しを失敗したのか?それともData Catalog書き込み時にデータが欠落したのか?Cloudwatch logsのGlueジョブログから調査する。

・・といった流れの元に、調査で使用したGlueのdynamicframeの内容をCloudwatch logsに載せる方法を紹介します。

構成

S3バケットに格納したCSVデータをGlueジョブで読み出し、データ型を変換した上でData Catalogに書き込みした後、Athenaより検索することでData Catalogに正しくデータが追加されているかを確認します。

なお、今回S3に格納したCSVデータはこちらになります。

観測所番号,都道府県,日時
11001,北海道,2023/4/1
11016,東京都,2023/4/2
11046,神奈川県,2023/4/3
11061,大阪府,2023/4/4
11076,京都府,2023/4/5
11091,兵庫県,2023/4/6

Glueジョブスクリプトで行っていること

Glueジョブ内のスクリプト全文はこちらになります。
・CSVデータ格納場所:s3://<S3バケットのURL>/<CSV格納フォルダ>/<CSV データファイル名>.csv
・Data Catalogデータベース指定:database=”<Data Catalogデータベース名>”
・Data Catalogテーブル指定:table_name=”<Data Catalogテーブル名>”
については、適宜名前を指定してください。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={
        "quoteChar": '"',
        "withHeader": True,
        "separator": ",",
        "optimizePerformance": False,
    },
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": [
            "s3://<S3バケットのURL>/<CSV格納フォルダ>/<CSVデータファイル名>.csv"
        ],
        "recurse": True,
    },
    transformation_ctx="S3bucket_node1",
)

# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
    frame=S3bucket_node1,
    mappings=[
        ("観測所番号", "string", "観測所番号", "string"),
        ("都道府県", "string", "都道府県", "string"),
        ("日時", "string", "日時", "timestamp"),
    ],
    transformation_ctx="ApplyMapping_node2",
)

# Script generated for node Data Catalog table
DataCatalogtable_node3 = glueContext.write_dynamic_frame.from_catalog(
    frame=ApplyMapping_node2,
    database="<Data Catalogデータベース名>",
    table_name="<Data Catalogテーブル名>",
    transformation_ctx="DataCatalogtable_node3",
)

job.commit()

このうち「S3バケットに格納したCSVデータをGlueジョブで読み出し」を実行している箇所はこちら。create_dynamic_frame.from_optionsにて、S3バケット上のCSVデータを取得します。

# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={
        "quoteChar": '"',
        "withHeader": True,
        "separator": ",",
        "optimizePerformance": False,
    },
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": [
            "s3://<S3バケットのURL>/<CSV格納フォルダ>/<CSVデータファイル名>.csv"
        ],
        "recurse": True,
    },
    transformation_ctx="S3bucket_node1",
)

「データ型を変換した上で」を実行している箇所はこちら。ApplyMappingにて、元のCSVデータのデータ型を変換しています。

# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
    frame=S3bucket_node1,
    mappings=[
        ("観測所番号", "string", "観測所番号", "string"),
        ("都道府県", "string", "都道府県", "string"),
        ("日時", "string", "日時", "timestamp"),
    ],
    transformation_ctx="ApplyMapping_node2",
)

最後に「Data Catalogに書き込み」を実行している箇所はこちらになります。write_dynamic_frame.from_catalogにて、Data Catalogにデータ型変換済のデータを書き込みします。

# Script generated for node Data Catalog table
DataCatalogtable_node3 = glueContext.write_dynamic_frame.from_catalog(
    frame=ApplyMapping_node2,
    database="<Data Catalogデータベース名>",
    table_name="<Data Catalogテーブル名>",
    transformation_ctx="DataCatalogtable_node3",
)

Glueジョブを実行してAthenaで検索

Glueジョブ(本編ではジョブ名「pre1h00_rct」)を実行し、正常終了しました。早速Athenaで検索してみましょう。

AthenaよりSelect文で全データを検索したところ・・「日時」データが挿入されていません!元のCSVにもちゃんとデータは入っていたし、どこでデータが欠落したのでしょうか?

Glueジョブ実行ログについて

Glueジョブ詳細画面にて「Cloudwatch logs」のリンクをクリックすると、ジョブ実行ログ画面に遷移します。
今回ジョブ自体はエラー無く終了しているので「Output logs」を選択します。

しかしながら有用な情報は得られませんでした。各処理でdynamicframeで実際に取得したテーブル情報が知りたいですね。

Glueのdynamicframeの内容をCloudwatch logsに載せる

Cloudwatch logsの「Output logs」にdynamicframeで実際に取得したテーブル情報を載せるには、Glueジョブスクリプト内に.toDF().show(truncate=False)の一文を追加します。
まずは「S3バケットに格納したCSVデータをGlueジョブで読み出し」実行箇所の最後に

<transformation_ctxで指定した名前>.toDF().show(truncate=False)

を、追加します。本編ではtransformation_ctxで指定した名前は「S3bucket_node1」ですので、S3bucket_node1.toDF().show(truncate=False)の一文を追加しています。

# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={
        "quoteChar": '"',
        "withHeader": True,
        "separator": ",",
        "optimizePerformance": False,
    },
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": [
            "s3://<S3バケットのURL>/<CSV格納フォルダ>/<CSVデータファイル名>.csv"
        ],
        "recurse": True,
    },
    transformation_ctx="S3bucket_node1",
)
S3bucket_node1.toDF().show(truncate=False)

また、「データ型を変換した上で」実行箇所にも追加します。元のCSVから正しくデータ取得しているのに、ApplyMappingによるデータ型変換で失敗している可能性もあるからです。
本編ではtransformation_ctxで指定した名前は「ApplyMapping_node2」ですので、ApplyMapping_node2.toDF().show(truncate=False)の一文を追加しています。

# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
    frame=S3bucket_node1,
    mappings=[
        ("観測所番号", "string", "観測所番号", "string"),
        ("都道府県", "string", "都道府県", "string"),
        ("日時", "string", "日時", "timestamp"),
    ],
    transformation_ctx="ApplyMapping_node2",
)
ApplyMapping_node2.toDF().show(truncate=False)

後編予告

Glueのdynamicframeの内容をCloudwatch logsに載せる(後編)では、dynamicframeで実際に取得したテーブル情報を載せたCloudwatch logsの「Output logs」を確認し、データがどこで欠落しているかを突き止めます。

この記事をシェアする
著者: