[GitHub] [hudi] t0il3ts0ap edited a comment on issue #2589: [SUPPORT] Issue with adding column while running deltastreamer with kafka source.

2021-03-29 Thread GitBox


t0il3ts0ap edited a comment on issue #2589:
URL: https://github.com/apache/hudi/issues/2589#issuecomment-809935902


   @nsivabalan Thats awesome. I will check it out.
   We have recently shifted to COW table and not facing issue of column 
addition there.
   I will check out column addition on MOR Table once without schema provider 
and post my findings here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] t0il3ts0ap edited a comment on issue #2589: [SUPPORT] Issue with adding column while running deltastreamer with kafka source.

2021-02-25 Thread GitBox


t0il3ts0ap edited a comment on issue #2589:
URL: https://github.com/apache/hudi/issues/2589#issuecomment-786029402


   @satishkotha Ran again on fresh table, still same issue. 
   
   SparkSubmit:
   ```
   spark-submit 
   --master yarn 
   --packages 
org.apache.spark:spark-avro_2.12:3.0.1,org.apache.hudi:hudi-utilities-bundle_2.12:0.7.0
 
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
   --conf spark.scheduler.mode=FAIR 
   --conf spark.task.maxFailures=5 
   --conf spark.rdd.compress=true 
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
   --conf spark.shuffle.service.enabled=true 
   --conf spark.sql.hive.convertMetastoreParquet=false 
   --conf spark.executor.heartbeatInterval=120s 
   --conf spark.network.timeout=600s 
   --conf spark.yarn.max.executor.failures=5 
   --conf spark.sql.catalogImplementation=hive 
   --conf 
"spark.driver.extraJavaOptions=-Dlog4j.configuration=/home/hadoop/log4j-spark.properties"
 
   --conf 
"spark.executor.extraJavaOptions=-Dlog4j.configuration=/home/hadoop/log4j-spark.properties"
 
   --deploy-mode client 
s3://navi-emr-poc/delta-streamer-test/jars/deltastreamer-addons-1.0-SNAPSHOT.jar
 
   --enable-sync 
   --hoodie-conf 
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
 
   --hoodie-conf hoodie.parquet.compression.codec=snappy 
   --hoodie-conf 
partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor 
   --hoodie-conf 
hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false 
   --hoodie-conf auto.offset.reset=latest 
   --hoodie-conf hoodie.avro.schema.validate=true
   --table-type MERGE_ON_READ 
   --source-class org.apache.hudi.utilities.sources.AvroKafkaSource 
   --schemaprovider-class 
org.apache.hudi.utilities.schema.NullTargetSchemaRegistryProvider  
   --props 
s3://navi-emr-poc/delta-streamer-test/config/kafka-source-nonprod.properties 
   --transformer-class com.navi.transform.DebeziumTransformer 
   --continuous 
   --hoodie-conf 
hoodie.deltastreamer.schemaprovider.registry.url=https://dev-dataplatform-schema-registry.np.navi-tech.in/subjects/to_be_deleted_service.public.accounts-value/versions/latest
 
   --hoodie-conf hoodie.datasource.hive_sync.database=to_be_deleted_service 
   --hoodie-conf hoodie.datasource.hive_sync.table=accounts 
   --hoodie-conf hoodie.datasource.write.recordkey.field=id 
   --hoodie-conf hoodie.datasource.write.precombine.field=__lsn 
   --hoodie-conf hoodie.datasource.write.partitionpath.field='' 
   --hoodie-conf 
hoodie.deltastreamer.source.kafka.topic=to_be_deleted_service.public.accounts 
   --hoodie-conf group.id=delta-streamer-to_be_deleted_service-accounts 
   --source-ordering-field __lsn 
   --target-base-path s3://navi-emr-poc/raw-data/to_be_deleted_service/accounts 
   --target-table accounts
   ```
   
   Transformer Code:
   ```
   public class DebeziumTransformer implements Transformer {
   
   public Dataset apply(JavaSparkContext javaSparkContext, 
SparkSession sparkSession,
   Dataset dataset, TypedProperties typedProperties) {
   
   Dataset transformedDataset = dataset
   .withColumn("__deleted", 
dataset.col("__deleted").cast(DataTypes.BooleanType))
   .withColumnRenamed("__deleted", "_hoodie_is_deleted")
   .drop("__op", "__source_ts_ms");
   
   log.info("TRANSFORMER SCHEMA STARTS");
   transformedDataset.printSchema();
   transformedDataset.show();
   log.info("TRANSFORMER SCHEMA ENDS");
   return transformedDataset;
   }
   }
   ```
   
   When I add the column, debezium updates the schema registry instantaneously 
and new records start flowing. Its possible that deltastreamer gets the new 
schema records before even hitting schema registry.
   
   ```
   Caused by: org.apache.avro.AvroTypeException: Found 
hoodie.source.hoodie_source, expecting hoodie.source.hoodie_source, missing 
required field test
   ```
   
   Attaching logs:
   [alog.txt](https://github.com/apache/hudi/files/6044367/alog.txt)
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] t0il3ts0ap edited a comment on issue #2589: [SUPPORT] Issue with adding column while running deltastreamer with kafka source.

2021-02-25 Thread GitBox


t0il3ts0ap edited a comment on issue #2589:
URL: https://github.com/apache/hudi/issues/2589#issuecomment-786029402


   @satishkotha Ran again on fresh table, still same issue. 
   
   SparkSubmit:
   ```
   spark-submit 
   --master yarn 
   --packages 
org.apache.spark:spark-avro_2.12:3.0.1,org.apache.hudi:hudi-utilities-bundle_2.12:0.7.0
 
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
   --conf spark.scheduler.mode=FAIR 
   --conf spark.task.maxFailures=5 
   --conf spark.rdd.compress=true 
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
   --conf spark.shuffle.service.enabled=true 
   --conf spark.sql.hive.convertMetastoreParquet=false 
   --conf spark.executor.heartbeatInterval=120s 
   --conf spark.network.timeout=600s 
   --conf spark.yarn.max.executor.failures=5 
   --conf spark.sql.catalogImplementation=hive 
   --conf 
"spark.driver.extraJavaOptions=-Dlog4j.configuration=/home/hadoop/log4j-spark.properties"
 
   --conf 
"spark.executor.extraJavaOptions=-Dlog4j.configuration=/home/hadoop/log4j-spark.properties"
 
   --deploy-mode client 
s3://navi-emr-poc/delta-streamer-test/jars/deltastreamer-addons-1.0-SNAPSHOT.jar
 
   --enable-sync 
   --hoodie-conf 
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
 
   --hoodie-conf hoodie.parquet.compression.codec=snappy 
   --hoodie-conf 
partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor 
   --hoodie-conf 
hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false 
   --hoodie-conf auto.offset.reset=latest 
   --hoodie-conf hoodie.avro.schema.validate=true
   --table-type MERGE_ON_READ 
   --source-class org.apache.hudi.utilities.sources.AvroKafkaSource 
   --schemaprovider-class 
org.apache.hudi.utilities.schema.NullTargetSchemaRegistryProvider  
   --props 
s3://navi-emr-poc/delta-streamer-test/config/kafka-source-nonprod.properties 
   --transformer-class com.navi.transform.DebeziumTransformer 
   --continuous 
   --hoodie-conf 
hoodie.deltastreamer.schemaprovider.registry.url=https://dev-dataplatform-schema-registry.np.navi-tech.in/subjects/to_be_deleted_service.public.accounts-value/versions/latest
 
   --hoodie-conf hoodie.datasource.hive_sync.database=to_be_deleted_service 
   --hoodie-conf hoodie.datasource.hive_sync.table=accounts 
   --hoodie-conf hoodie.datasource.write.recordkey.field=id 
   --hoodie-conf hoodie.datasource.write.precombine.field=__lsn 
   --hoodie-conf hoodie.datasource.write.partitionpath.field='' 
   --hoodie-conf 
hoodie.deltastreamer.source.kafka.topic=to_be_deleted_service.public.accounts 
   --hoodie-conf group.id=delta-streamer-to_be_deleted_service-accounts 
   --source-ordering-field __lsn 
   --target-base-path s3://navi-emr-poc/raw-data/to_be_deleted_service/accounts 
   --target-table accounts
   ```
   
   Transformer Code:
   ```
   public class DebeziumTransformer implements Transformer {
   
   public Dataset apply(JavaSparkContext javaSparkContext, 
SparkSession sparkSession,
   Dataset dataset, TypedProperties typedProperties) {
   
   Dataset transformedDataset = dataset
   .withColumn("__deleted", 
dataset.col("__deleted").cast(DataTypes.BooleanType))
   .withColumnRenamed("__deleted", "_hoodie_is_deleted")
   .drop("__op", "__source_ts_ms");
   
   log.info("TRANSFORMER SCHEMA STARTS");
   transformedDataset.printSchema();
   transformedDataset.show();
   log.info("TRANSFORMER SCHEMA ENDS");
   return transformedDataset;
   }
   }
   ```
   
   When I add the column, debezium updates the schema registry instantaneously 
and new records start flowing. Its possible that deltastreamer gets the new 
schema records before even hitting schema registry.
   
   Attaching logs:
   [alog.txt](https://github.com/apache/hudi/files/6044367/alog.txt)
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org