AWS GlueでOracle RACへJDBC接続
ただし、Python Sparkで直接JDBC接続すればこれが可能になります。
Oracle RACからCSV形式でS3に出力するサンプルがこちら。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from datetime import datetime as dt
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
logger = glueContext.get_logger()
logger.info("***** START JOB *****")
# 接続の定義
DB_HOST1 = 'xxx.xxx.xxx.xxx'
DB_HOST2 = 'yyy.yyy.yyy.yyy'
DB_USER = 'admin'
DB_PASS = '********************'
TABLE_NAME = 'MY_TABLE'
JDBC_URL = 'jdbc:oracle:thin:@(DESCRIPTION=(LOAD_BALANCE=ON)(FAIL_OVER=ON)'
JDBC_URL += '(ADDRESS=(PROTOCOL=TCP)(HOST=' + DB_HOST1 + ')(PORT=1521))'
JDBC_URL += '(ADDRESS=(PROTOCOL=TCP)(HOST=' + DB_HOST2 + ')(PORT=1521))'
JDBC_URL += '(CONNECT_DATA=(SERVICE_NAME=ORCL)))'
source_df = spark \
.read \
.format("jdbc") \
.option("url", JDBC_URL) \
.option("user", DB_USER) \
.option("password", DB_PASS) \
.option("dbtable", TABLE_NAME) \
.option("driver", "oracle.jdbc.driver.OracleDriver") \
.load()
datasource0 = DynamicFrame.fromDF(source_df, glueContext, "datasource0")
# カラムのマッピング
applymapping1 = ApplyMapping.apply(
frame = datasource0,
mappings = [("item_code", "string", "item_code", "string"), ("name", "string", "name", "string"), ("price", "decimal(9,0)", "price", "decimal(9,0)"), ("created_at", "timestamp", "created_at", "timestamp")],
transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
# 出力の指定
BACKET_NAME = 'my_bucket'
datetime_now = dt.now()
date_string = datetime_now.strftime('%Y%m%d')
sava_path = "s3://{0}/{1}/{2}".format(BACKET_NAME, TABLE_NAME, date_string)
partition = 5
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
dropnullfields3 = dropnullfields3.toDF().repartition(partition)
dropnullfields3.write.format('csv').mode('overwrite').option("header", "true").save(sava_path)
logger.info("***** END JOB *****")
job.commit()
ディスカッション
コメント一覧
まだ、コメントがありません