shaurya-nwse opened a new issue, #8486:
URL: https://github.com/apache/hudi/issues/8486

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   Yes
   - Join the mailing list to engage in conversations and get faster support at 
dev-subscr...@hudi.apache.org.
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   We've been running Hudi in production successfully for many of our tables. 
Our general use case is to create a Hive table from a Kafka topic and we use 
the Hudi deltastreamer for this. 
   Recently, one of our topics had an additional field added to it (with a 
default value) but deltastreamer started throwing exceptions. The stacktrace 
points to probably some version mismatch between Avro and Jackson used in the 
the `parquet-avro`  package. 
   
   
   **To Reproduce**
   We ran into this when saving the table on HDFS, but I could reproduce it 
locally as well. 
   
   Steps to reproduce the behavior:
   
   1. Create a deltastreamer configuration to consume an Avro topic from Kafka 
and write to HDFS
   ```properties
   hoodie.datasource.write.recordkey.field=work_experience_id
   hoodie.datasource.write.partitionpath.field=dt
   hoodie.datasource.write.precombine.field=dt
   hoodie.datasource.write.operation=upsert
   hoodie.table.name=wes
   
   hoodie.index.type=GLOBAL_BLOOM
   hoodie.deltastreamer.source.kafka.topic=wes
   
hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/wes-value/versions/latest
   hoodie.metadata.enable=false
   bootstrap.servers=localhost:9092
   auto.offset.reset=earliest
   schema.registry.url=http://0.0.0.0:8081
   group.id=wes-only
   
   
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
   
hoodie.deltastreamer.source.kafka.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
   
   hoodie.datasource.write.reconcile.schema=true
   ```
   2. The avro schema on the schema registry prior to changes is as follows:
   ```avsc
   {
     "namespace": "com.ds.model.profile",
     "type": "record",
     "name": "WES",
     "fields": [
         {"name": "work_experience_id", "type": "long"},
         {"name": "profile_id", "type": "long", "default": -1},
         {"name": "primary_job", "type": "boolean"},
         {"name": "jobtitle", "type": "string"},
         {"name": "discipline_id", "type": ["null", "int"], "default": null},
         {"name": "industry_id", "type": ["null", "long"], "default": null},
         {"name": "dt", "type":  ["null","string"], "default":  null}
     ]
   }
   ```
   3. Initial run works fine and the hudi table is created on HDFS/Local FS
   4. The schema is modified to add a new field with a default value
   ```avsc
   {
     "namespace": "com.ds.model.profile",
     "type": "record",
     "name": "WES",
     "fields": [
         {"name": "work_experience_id", "type": "long"},
         {"name": "profile_id", "type": "long", "default": -1},
         {"name": "primary_job", "type": "boolean"},
         {"name": "jobtitle", "type": "string"},
         {"name": "discipline_id", "type": ["null", "int"], "default": null},
         {"name": "industry_id", "type": ["null", "long"], "default": null},
         // this field below is added
         {"name": "industry_ids", "type": {"type": "array", "items": ["null", 
"long"] }, "default": []},
         //
         {"name": "dt", "type":  ["null","string"], "default":  null}
     ]
   }
   ```
    5. The deltastreamer job fails with: `Caused by: 
java.lang.NoSuchMethodError: 'org.codehaus.jackson.JsonNode 
org.apache.avro.Schema$Field.defaultValue()'`
   
   The timeline shows rollback markers as the attempt to commit fails.
   
   **Expected behavior**
   
   The schema change should be backward compatible as we added a field with a 
default value. The deltastreamer writes data to the hudi table conforming to 
the new schema.
   
   **Environment Description**
   
   * Hudi version : `0.11.1`
   
   * Spark version : `3.2.1`
   
   * Hive version : `3.1`
   
   * Hadoop version : `3.1.0.0-78`
   
   * Storage (HDFS/S3/GCS..) : HDFS/LocalFS
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   The spark-submit for running the deltastreamer (to reproduce locally):
   ```bash
   #!/bin/bash
   spark-submit \
       --name "WesHudi" \
       --jars 
/Users/shaurya.rawat/Documents/jars/hudi-utilities-bundle_2.12-0.11.1.jar \
       --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
       --conf spark.hadoop.parquet.avro.write-old-list-structure=false\
       --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
/Users/shaurya.rawat/Documents/jars/hudi-utilities-bundle_2.12-0.11.1.jar  \
       --table-type COPY_ON_WRITE \
       --source-ordering-field dt \
       --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
       --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider \
       --props 
file:///Users/shaurya.rawat/Documents/hudi-deltastreamer/config/wes-only.properties
 \
       --target-base-path 
file:///Users/shaurya.rawat/Documents/hudi-deltastreamer/data/wes \
       --target-table wes \
       --op UPSERT
   ```
   
   **Stacktrace**
   
    ```
   Caused by: org.apache.hudi.exception.HoodieException: operation has failed
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:248)
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:278)
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        ... 3 more
   Caused by: java.lang.NoSuchMethodError: 'org.codehaus.jackson.JsonNode 
org.apache.avro.Schema$Field.defaultValue()'
        at 
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:168)
        at 
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
        at 
org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
        at 
org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
        at 
org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:185)
        at 
org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
        at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
        at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
        at 
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        ... 4 more
   ```
   
   Any help is appreciated 🙂 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to