GlueのJDBC並列読取りのlowerBound、upperBoundを動的に求めたい


Glueの並列読取りのためのプロパティのlowerBound、upperBound。

パーティションカラムの最大値と最小値をジョブ実行のたびに動的に求めて設定できるようにしてみました。
DATABASE_USER = 'USER'
DATABASE_PASSWORD = 'PASSWORD'
TABLE_NAME = 'MY_TABLE'
PARTITION_COLUMN = 'ID'

query = "(SELECT max({0}), min({0}) FROM {1}) sub".format(
    PARTITION_COLUMN, TABLE_NAME
)
properties = {
    "user": DATABASE_USER,
    "password": DATABASE_PASSWORD,
}
​(upper_bound, lower_bound) = (spark.read
    .jdbc(url=JDBC_URL, table=query, properties=properties)
    .first())

df = spark \
    .read \
    .format("jdbc") \
    .option("url", JDBC_URL) \
    .option("user", DATABASE_USER) \
    .option("password", DATABASE_PASSWORD) \
    .option("dbtable", TABLE_NAME) \
    .option("numPartitions", 100) \
    .option("partitionColumn", PARTITION_COLUMN ) \
    .option("lowerBound", lower_bound) \
    .option("upperBound", upper_bound) \
    .load()
自動でチューニングできるので便利です。

numPartitionsも、「( upper_bound – lower_bound ) / 1つのワーカーで処理したいレコード数 = numPartitions」の式で求められそうです。

おまけで日付型でやるパターン(Oracle Databaseの場合)
DATABASE_USER = 'USER'
DATABASE_PASSWORD = 'PASSWORD'
TABLE_NAME = 'MY_TABLE'
PARTITION_COLUMN = 'YMD'

query = "(SELECT max({0}), min({0}) FROM {1}) sub".format(
    PARTITION_COLUMN, TABLE_NAME
)
properties = {
    "user": DATABASE_USER,
    "password": DATABASE_PASSWORD,
}
​(upper_bound, lower_bound) = (spark.read
    .jdbc(url=JDBC_URL, table=query, properties=properties)
    .first())

lower_bound = lower_bound.strftime('%Y-%m-%d')
upper_bound = upper_bound.strftime('%Y-%m-%d')
num_partition = (upper_bound - lower_bound).days # 日数を求める

df = spark \
    .read \
    .format("jdbc") \
    .option("url", JDBC_URL) \
    .option("user", DATABASE_USER) \
    .option("password", DATABASE_PASSWORD) \
    .option("dbtable", TABLE_NAME) \
    .option("driver", "oracle.jdbc.driver.OracleDriver") \
    .option("numPartitions", num_partition) \
    .option("partitionColumn", PARTITION_COLUMN ) \
    .option("lowerBound", lower_bound) \
    .option("upperBound", upper_bound) \
    .option("oracle.jdbc.mapDateToTimestamp", "false") \
    .option("sessionInitStatement", "ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD'") \
    .load()