soumilshah1995 commented on issue #9143: URL: https://github.com/apache/hudi/issues/9143#issuecomment-1631710874
# Project : Using Apache Hudi Deltastreamer and AWS DMS Hands on Labs ![image](https://user-images.githubusercontent.com/39345855/228927370-f7264d4a-f026-4014-9df4-b063f000f377.png) ------------------------------------------------------------------ Video Tutorials * Part 1: Project Overview : https://www.youtube.com/watch?v=D9L0NLSqC1s * Part 2: Aurora Setup : https://youtu.be/HR5A6iGb4LE * Part 3: https://youtu.be/rnyj5gkIPKA * Part 4: https://youtu.be/J1xvPIcDIaQ * part 5 https://youtu.be/On-6L6oJ158 * How to setup EMR cluster with VPC https://www.youtube.com/watch?v=-e1-Zsk17Ss&t=4s * PDF guide https://drive.google.com/file/d/1Hj_gyZ8o-wFf4tqTYZXOHNJz2f3ARepn/view ------------------------------------------------------------------ # Steps ### Step 1: Create Aurora Source Database and update the seetings to enable CDC on Postgres * Read More : https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.PostgreSQL.html * Create new Create parameter group as shows in video and make sure to edit these two setting as shown in video part 2 ``` rds.logical_replication 1 wal_sender_timeout 300000 ``` once done please apply these to database and reboot your Database ### Step 2: Run Python file to create a table called sales in public schema in aurora and lets populate some data into table Run python run.py ``` try: import os import logging from functools import wraps from abc import ABC, abstractmethod from enum import Enum from logging import StreamHandler import uuid from datetime import datetime, timezone from random import randint import datetime import sqlalchemy as db from faker import Faker import random import psycopg2 import psycopg2.extras as extras from dotenv import load_dotenv load_dotenv(".env") except Exception as e: raise Exception("Error: {} ".format(e)) class Logging: """ This class is used for logging data to datadog an to the console. """ def __init__(self, service_name, ddsource, logger_name="demoapp"): self.service_name = service_name self.ddsource = ddsource self.logger_name = logger_name format = "[%(asctime)s] %(name)s %(levelname)s %(message)s" self.logger = logging.getLogger(self.logger_name) formatter = logging.Formatter(format, ) if logging.getLogger().hasHandlers(): logging.getLogger().setLevel(logging.INFO) else: logging.basicConfig(level=logging.INFO) global logger logger = Logging(service_name="database-common-module", ddsource="database-common-module", logger_name="database-common-module") def error_handling_with_logging(argument=None): def real_decorator(function): @wraps(function) def wrapper(self, *args, **kwargs): function_name = function.__name__ response = None try: if kwargs == {}: response = function(self) else: response = function(self, **kwargs) except Exception as e: response = { "status": -1, "error": {"message": str(e), "function_name": function.__name__}, } logger.logger.info(response) return response return wrapper return real_decorator class DatabaseInterface(ABC): @abstractmethod def get_data(self, query): """ For given query fetch the data :param query: Str :return: Dict """ def execute_non_query(self, query): """ Inserts data into SQL Server :param query: Str :return: Dict """ def insert_many(self, query, data): """ Insert Many items into database :param query: str :param data: tuple :return: Dict """ def get_data_batch(self, batch_size=10, query=""): """ Gets data into batches :param batch_size: INT :param query: STR :return: DICT """ def get_table(self, table_name=""): """ Gets the table from database :param table_name: STR :return: OBJECT """ class Settings(object): """settings class""" def __init__( self, port="", server="", username="", password="", timeout=100, database_name="", connection_string="", collection_name="", **kwargs, ): self.port = port self.server = server self.username = username self.password = password self.timeout = timeout self.database_name = database_name self.connection_string = connection_string self.collection_name = collection_name class DatabaseAurora(DatabaseInterface): """Aurora database class""" def __init__(self, data_base_settings): self.data_base_settings = data_base_settings self.client = db.create_engine( "postgresql://{username}:{password}@{server}:{port}/{database}".format( username=self.data_base_settings.username, password=self.data_base_settings.password, server=self.data_base_settings.server, port=self.data_base_settings.port, database=self.data_base_settings.database_name ) ) self.metadata = db.MetaData() logger.logger.info("Auroradb connection established successfully.") @error_handling_with_logging() def get_data(self, query): self.query = query cursor = self.client.connect() response = cursor.execute(self.query) result = response.fetchall() columns = response.keys()._keys data = [dict(zip(columns, item)) for item in result] cursor.close() return {"statusCode": 200, "data": data} @error_handling_with_logging() def execute_non_query(self, query): self.query = query cursor = self.client.connect() cursor.execute(self.query) cursor.close() return {"statusCode": 200, "data": True} @error_handling_with_logging() def insert_many(self, query, data): self.query = query print(data) cursor = self.client.connect() cursor.execute(self.query, data) cursor.close() return {"statusCode": 200, "data": True} @error_handling_with_logging() def get_data_batch(self, batch_size=10, query=""): self.query = query cursor = self.client.connect() response = cursor.execute(self.query) columns = response.keys()._keys while True: result = response.fetchmany(batch_size) if not result: break else: items = [dict(zip(columns, data)) for data in result] yield items @error_handling_with_logging() def get_table(self, table_name=""): table = db.Table(table_name, self.metadata, autoload=True, autoload_with=self.client) return {"statusCode": 200, "table": table} class DatabaseAuroraPycopg(DatabaseInterface): """Aurora database class""" def __init__(self, data_base_settings): self.data_base_settings = data_base_settings self.client = psycopg2.connect( host=self.data_base_settings.server, port=self.data_base_settings.port, database=self.data_base_settings.database_name, user=self.data_base_settings.username, password=self.data_base_settings.password, ) @error_handling_with_logging() def get_data(self, query): self.query = query cursor = self.client.cursor() cursor.execute(self.query) result = cursor.fetchall() columns = [column[0] for column in cursor.description] data = [dict(zip(columns, item)) for item in result] cursor.close() _ = {"statusCode": 200, "data": data} return _ @error_handling_with_logging() def execute(self, query, data): self.query = query cursor = self.client.cursor() cursor.execute(self.query, data) self.client.commit() cursor.close() return {"statusCode": 200, "data": True} @error_handling_with_logging() def get_data_batch(self, batch_size=10, query=""): self.query = query cursor = self.client.cursor() cursor.execute(self.query) columns = [column[0] for column in cursor.description] while True: result = cursor.fetchmany(batch_size) if not result: break else: items = [dict(zip(columns, data)) for data in result] yield items @error_handling_with_logging() def insert_many(self, query, data): self.query = query cursor = self.client.cursor() extras.execute_batch(cursor, self.query, data) self.client.commit() cursor.close() return {"statusCode": 200, "data": True} class Connector(Enum): ON_AURORA_PYCOPG = DatabaseAurora( data_base_settings=Settings( port="5432", server="XXXXXXXXX", username="postgres", password="postgres", database_name="postgres", ) ) def main(): helper = Connector.ON_AURORA_PYCOPG.value import time states = ("AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA", "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD", "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY") shipping_types = ("Free", "3-Day", "2-Day") product_categories = ("Garden", "Kitchen", "Office", "Household") referrals = ("Other", "Friend/Colleague", "Repeat Customer", "Online Ad") try: query = """ CREATE TABLE public.sales ( invoiceid INTEGER, itemid INTEGER, category TEXT, price NUMERIC(10,2), quantity INTEGER, orderdate DATE, destinationstate TEXT, shippingtype TEXT, referral TEXT ); """ helper.execute_non_query(query=query,) time.sleep(2) except Exception as e: print("Error",e) try: query = """ ALTER TABLE execute_non_query.sales REPLICA IDENTITY FULL """ helper.execute(query=query) time.sleep(2) except Exception as e: pass for i in range(0, 100): item_id = random.randint(1, 100) state = states[random.randint(0, len(states) - 1)] shipping_type = shipping_types[random.randint(0, len(shipping_types) - 1)] product_category = product_categories[random.randint(0, len(product_categories) - 1)] quantity = random.randint(1, 4) referral = referrals[random.randint(0, len(referrals) - 1)] price = random.randint(1, 100) order_date = datetime.date(2016, random.randint(1, 12), random.randint(1, 28)).isoformat() invoiceid = random.randint(1, 20000) data_order = (invoiceid, item_id, product_category, price, quantity, order_date, state, shipping_type, referral) query = """INSERT INTO public.sales ( invoiceid, itemid, category, price, quantity, orderdate, destinationstate,shippingtype, referral ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)""" helper.insert_many(query=query, data=data_order) main() ``` ### Step 3: Create a DMS Replication Instance as shown in video 3 and create S3 bucket and iAM roles refer video part 3 ### Create Source and target in DMS * When you create target add following settings ``` { "CsvRowDelimiter": "\\n", "CsvDelimiter": ",", "BucketFolder": "raw", "BucketName": "XXXXXXXXXXXXX", "CompressionType": "NONE", "DataFormat": "parquet", "EnableStatistics": true, "IncludeOpForFullLoad": true, "TimestampColumnName": "replicadmstimestamp", "DatePartitionEnabled": false } ``` #### Note add this as well in Extra connection attribute ![image](https://user-images.githubusercontent.com/39345855/228972148-10726c19-678b-4d77-a607-77fd7eebb105.png) ``` parquetVersion=PARQUET_2_0; ``` ### Step 4: create Task add following settings ``` { "rules": [ { "rule-type": "selection", "rule-id": "861743510", "rule-name": "861743510", "object-locator": { "schema-name": "public", "table-name": "sales" }, "rule-action": "include", "filters": [] } ] } ``` # Create EMR cluster and fire the delta streamer * Note you can download parquert files https://drive.google.com/drive/folders/1BwNEK649hErbsWcYLZhqCWnaXFX3mIsg?usp=share_link * these are sample data files generated from DMS you can directly copy into your S3 for learning purposes ``` spark-submit \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \ --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \ --conf spark.sql.hive.convertMetastoreParquet=false \ --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory \ --master yarn \ --deploy-mode client \ --executor-memory 1g \ /usr/lib/hudi/hudi-utilities-bundle.jar \ --table-type COPY_ON_WRITE \ --op UPSERT \ --enable-sync \ --source-ordering-field replicadmstimestamp \ --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \ --target-base-path s3://delta-streamer-demo-hudi/hudi/public/invoice \ --target-table invoice \ --payload-class org.apache.hudi.common.model.AWSDmsAvroPayload \ --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator \ --hoodie-conf hoodie.datasource.write.recordkey.field=invoiceid \ --hoodie-conf hoodie.datasource.write.partitionpath.field=destinationstate \ --hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://delta-streamer-demo-hudi/raw/public/sales \ --hoodie-conf hoodie.datasource.write.precombine.field=replicadmstimestamp \ --hoodie-conf hoodie.database.name=hudidb_raw \ --hoodie-conf hoodie.datasource.hive_sync.enable=true \ --hoodie-conf hoodie.datasource.hive_sync.database=hudidb_raw \ --hoodie-conf hoodie.datasource.hive_sync.table=tbl_invoices \ --hoodie-conf hoodie.datasource.hive_sync.partition_fields=destinationstate ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org