YousifS7 opened a new issue, #11662:
URL: https://github.com/apache/hudi/issues/11662

   Dear Hudi Team,
   
   We have a data infrastructure that involves SQL Server CDC-enabled tables 
being ingested by Debezium into Kafka compacted topics. 
   We recently started experimenting with 
`org.apache.hudi.utilities.streamer.HoodieStreamer`. We were able to build a 
pipeline using `HoodieStreamer` to ingest CDC data out of Kafka and write them 
incrementally into Apache Hudi tables.
   
   We discovered that, currently, there is no `DebeziumSource` or 
`DebeziumAvroPayload` implementation available for SQL Server-based Debezium 
sources. While we recognize that it is possible to develop custom 
implementations for these classes, we have instead explored alternative 
approaches using existing classes and functionalities.
   We would like to confirm whether this alternative approach is valid and if 
it will effectively maintain data integrity and consistency.
   
   **Setup Details:**
   
   In order to substitute for SQL Server based `DebeziumSource` and 
`DebeziumAvroPayload`, we utilized SQL `Transformer` 
(`org.apache.hudi.utilities.transform.SqlFileBasedTransformer`).
   
   > Spark-Submit
   ```
   spark-submit
   --packages 
org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0,org.apache.hudi:hudi-aws-bundle:0.15.0
 
   --conf spark.streaming.kafka.allowNonConsecutiveOffsets=true 
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
   --class org.apache.hudi.utilities.streamer.HoodieStreamer 
s3://some_bucket/libs/hudi-utilities-slim-bundle.jar 
   --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider 
   --props s3://some_bucket/configs/some_table_name.properties 
   --source-class org.apache.hudi.utilities.sources.AvroKafkaSource 
   --enable-sync 
   --source-ordering-field ts_ms 
   --target-base-path s3://some_bucket/some_folder/some_table_name 
   --target-table some_table_name 
   --table-type COPY_ON_WRITE 
   --sync-tool-classes org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool 
   --op UPSERT 
   --transformer-class 
org.apache.hudi.utilities.transform.SqlFileBasedTransformer
   ```
   
   > Properties File
   ```
   
hoodie.streamer.transformer.sql.file=s3://some_bucket/configs/some_table_name.sql
   
hoodie.streamer.schemaprovider.registry.url=http://localhost:8081/subjects/test.dbo.some_table_name-value/versions/latest
   
hoodie.streamer.schemaprovider.registry.targetUrl=http://localhost:8081/subjects/sometargettable/versions/latest
   schema.registry.url=http://localhost:8081
   hoodie.streamer.source.kafka.topic=test.dbo.some_table_name
   bootstrap.servers=localhost:9092
   auto.offset.reset=earliest
   
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
   hoodie.datasource.write.partitionpath.field=partition_path
   hoodie.datasource.write.recordkey.field=hudi_key
   hoodie.datasource.write.precombine.field=ts_ms
   hoodie.datasource.hive_sync.database=test_db
   ```
   
   > SQL Transformer File
   ```
   CACHE TABLE dbz_filtered AS
   SELECT ts_ms, op, before, after FROM <SRC> WHERE op IN ('d', 'u', 'c', 'r');
   
   CACHE TABLE dbz_events AS
   SELECT ts_ms, CASE WHEN op = 'd' THEN before ELSE after END AS 
source_fields, CASE WHEN op = 'd' THEN true ELSE false END AS is_deleted FROM 
dbz_filtered;
   
   CACHE TABLE dbz_fields AS
   SELECT ts_ms, source_fields.* FROM dbz_events;
   
   SELECT s.*, Concat(s.col1, s.col2) AS hudi_key, YEAR(FROM_UNIXTIME(s.col2 / 
1000)) AS partition_path FROM dbz_fields s;
   ```
   
   **Environment Description**
   
   * Hudi version : 0.15
   
   * Spark version : 3.5.0
   
   * Hive version : N/A
   
   * Hadoop version : N/A
   
   * Storage : S3
   
   * Running on Docker? : No
   
   Based on the provided information, the source class being utilized is 
org.apache.hudi.utilities.sources.AvroKafkaSource. By default, the payload 
class associated with this configuration is 
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.
   
   Based on the current configuration, the job appears to be executing as 
expected. However, we seek confirmation that this implementation effectively 
utilizes the HoodieStreamer for the intended use case, specifically for 
ingesting CDC (Change Data Capture) changes from SQL Server via Kafka into 
Apache Hudi tables while maintaining data integrity and consistency.
   
   Thank you
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to