[GitHub] [hudi] t0il3ts0ap edited a comment on issue #2589: [SUPPORT] Issue with adding column while running deltastreamer with kafka source.
t0il3ts0ap edited a comment on issue #2589: URL: https://github.com/apache/hudi/issues/2589#issuecomment-809935902 @nsivabalan Thats awesome. I will check it out. We have recently shifted to COW table and not facing issue of column addition there. I will check out column addition on MOR Table once without schema provider and post my findings here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] t0il3ts0ap edited a comment on issue #2589: [SUPPORT] Issue with adding column while running deltastreamer with kafka source.
t0il3ts0ap edited a comment on issue #2589: URL: https://github.com/apache/hudi/issues/2589#issuecomment-786029402 @satishkotha Ran again on fresh table, still same issue. SparkSubmit: ``` spark-submit --master yarn --packages org.apache.spark:spark-avro_2.12:3.0.1,org.apache.hudi:hudi-utilities-bundle_2.12:0.7.0 --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.scheduler.mode=FAIR --conf spark.task.maxFailures=5 --conf spark.rdd.compress=true --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.shuffle.service.enabled=true --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.executor.heartbeatInterval=120s --conf spark.network.timeout=600s --conf spark.yarn.max.executor.failures=5 --conf spark.sql.catalogImplementation=hive --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=/home/hadoop/log4j-spark.properties" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=/home/hadoop/log4j-spark.properties" --deploy-mode client s3://navi-emr-poc/delta-streamer-test/jars/deltastreamer-addons-1.0-SNAPSHOT.jar --enable-sync --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor --hoodie-conf hoodie.parquet.compression.codec=snappy --hoodie-conf partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor --hoodie-conf hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false --hoodie-conf auto.offset.reset=latest --hoodie-conf hoodie.avro.schema.validate=true --table-type MERGE_ON_READ --source-class org.apache.hudi.utilities.sources.AvroKafkaSource --schemaprovider-class org.apache.hudi.utilities.schema.NullTargetSchemaRegistryProvider --props s3://navi-emr-poc/delta-streamer-test/config/kafka-source-nonprod.properties --transformer-class com.navi.transform.DebeziumTransformer --continuous --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=https://dev-dataplatform-schema-registry.np.navi-tech.in/subjects/to_be_deleted_service.public.accounts-value/versions/latest --hoodie-conf hoodie.datasource.hive_sync.database=to_be_deleted_service --hoodie-conf hoodie.datasource.hive_sync.table=accounts --hoodie-conf hoodie.datasource.write.recordkey.field=id --hoodie-conf hoodie.datasource.write.precombine.field=__lsn --hoodie-conf hoodie.datasource.write.partitionpath.field='' --hoodie-conf hoodie.deltastreamer.source.kafka.topic=to_be_deleted_service.public.accounts --hoodie-conf group.id=delta-streamer-to_be_deleted_service-accounts --source-ordering-field __lsn --target-base-path s3://navi-emr-poc/raw-data/to_be_deleted_service/accounts --target-table accounts ``` Transformer Code: ``` public class DebeziumTransformer implements Transformer { public Dataset apply(JavaSparkContext javaSparkContext, SparkSession sparkSession, Dataset dataset, TypedProperties typedProperties) { Dataset transformedDataset = dataset .withColumn("__deleted", dataset.col("__deleted").cast(DataTypes.BooleanType)) .withColumnRenamed("__deleted", "_hoodie_is_deleted") .drop("__op", "__source_ts_ms"); log.info("TRANSFORMER SCHEMA STARTS"); transformedDataset.printSchema(); transformedDataset.show(); log.info("TRANSFORMER SCHEMA ENDS"); return transformedDataset; } } ``` When I add the column, debezium updates the schema registry instantaneously and new records start flowing. Its possible that deltastreamer gets the new schema records before even hitting schema registry. ``` Caused by: org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source, expecting hoodie.source.hoodie_source, missing required field test ``` Attaching logs: [alog.txt](https://github.com/apache/hudi/files/6044367/alog.txt) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] t0il3ts0ap edited a comment on issue #2589: [SUPPORT] Issue with adding column while running deltastreamer with kafka source.
t0il3ts0ap edited a comment on issue #2589: URL: https://github.com/apache/hudi/issues/2589#issuecomment-786029402 @satishkotha Ran again on fresh table, still same issue. SparkSubmit: ``` spark-submit --master yarn --packages org.apache.spark:spark-avro_2.12:3.0.1,org.apache.hudi:hudi-utilities-bundle_2.12:0.7.0 --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.scheduler.mode=FAIR --conf spark.task.maxFailures=5 --conf spark.rdd.compress=true --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.shuffle.service.enabled=true --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.executor.heartbeatInterval=120s --conf spark.network.timeout=600s --conf spark.yarn.max.executor.failures=5 --conf spark.sql.catalogImplementation=hive --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=/home/hadoop/log4j-spark.properties" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=/home/hadoop/log4j-spark.properties" --deploy-mode client s3://navi-emr-poc/delta-streamer-test/jars/deltastreamer-addons-1.0-SNAPSHOT.jar --enable-sync --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor --hoodie-conf hoodie.parquet.compression.codec=snappy --hoodie-conf partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor --hoodie-conf hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false --hoodie-conf auto.offset.reset=latest --hoodie-conf hoodie.avro.schema.validate=true --table-type MERGE_ON_READ --source-class org.apache.hudi.utilities.sources.AvroKafkaSource --schemaprovider-class org.apache.hudi.utilities.schema.NullTargetSchemaRegistryProvider --props s3://navi-emr-poc/delta-streamer-test/config/kafka-source-nonprod.properties --transformer-class com.navi.transform.DebeziumTransformer --continuous --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=https://dev-dataplatform-schema-registry.np.navi-tech.in/subjects/to_be_deleted_service.public.accounts-value/versions/latest --hoodie-conf hoodie.datasource.hive_sync.database=to_be_deleted_service --hoodie-conf hoodie.datasource.hive_sync.table=accounts --hoodie-conf hoodie.datasource.write.recordkey.field=id --hoodie-conf hoodie.datasource.write.precombine.field=__lsn --hoodie-conf hoodie.datasource.write.partitionpath.field='' --hoodie-conf hoodie.deltastreamer.source.kafka.topic=to_be_deleted_service.public.accounts --hoodie-conf group.id=delta-streamer-to_be_deleted_service-accounts --source-ordering-field __lsn --target-base-path s3://navi-emr-poc/raw-data/to_be_deleted_service/accounts --target-table accounts ``` Transformer Code: ``` public class DebeziumTransformer implements Transformer { public Dataset apply(JavaSparkContext javaSparkContext, SparkSession sparkSession, Dataset dataset, TypedProperties typedProperties) { Dataset transformedDataset = dataset .withColumn("__deleted", dataset.col("__deleted").cast(DataTypes.BooleanType)) .withColumnRenamed("__deleted", "_hoodie_is_deleted") .drop("__op", "__source_ts_ms"); log.info("TRANSFORMER SCHEMA STARTS"); transformedDataset.printSchema(); transformedDataset.show(); log.info("TRANSFORMER SCHEMA ENDS"); return transformedDataset; } } ``` When I add the column, debezium updates the schema registry instantaneously and new records start flowing. Its possible that deltastreamer gets the new schema records before even hitting schema registry. Attaching logs: [alog.txt](https://github.com/apache/hudi/files/6044367/alog.txt) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org