ROOBALJINDAL opened a new issue, #7064:
URL: https://github.com/apache/hudi/issues/7064

   
   
   **Describe the problem you faced**
   
   Data ingestion from csv file is working for FilebasedSchemaProvider but not 
working if schema is provided with SchemaRegistryProvider for the same csv data 
file. 
   
   I have created following simple employee.csv file for testing purpose having 
following content:
   
   ```
   1001,a1,b1,c1
   1111,d1,e1,f1
   
   ```
   
   **Schema: source-flattened.avsc**
   
   ```
   {
     "type" : "record",
     "name" : "triprec",
     "fields": [
       {
         "name": "guidelinesid",
         "type": "long"
       },
       {
         "name": "str_one",
         "type": "string"
       },
       {
         "name": "str_two",
         "type": "string"
       },
       {
         "name": "str_three",
         "type": "string"
       }
     ]
   }
   ```
   
   **Spark command used which is working fine:**
   
   ```
   spark-submit  \
     --jars /usr/lib/spark/external/lib/spark-avro_2.12-3.3.0-amzn-0.jar \
     --master local --deploy-mode client \
     --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
/usr/lib/hudi/hudi-utilities-bundle_2.12-0.11.1-amzn-0.jar \
     --table-type COPY_ON_WRITE --op BULK_INSERT \
     --target-base-path 
s3://hudi-multistreamer-roobal/csv-test/synced-table/employee \
     --target-table employee  \
     --min-sync-interval-seconds 60 \
     --source-class org.apache.hudi.utilities.sources.CsvDFSSource \
     --source-ordering-field employeesid \
     --hoodie-conf 
hoodie.deltastreamer.source.dfs.root=s3a://my-bucket/csv-test/source-csv/ \
     --hoodie-conf hoodie.datasource.write.recordkey.field=employeesid \
     --enable-hive-sync \
     --hoodie-conf hoodie.datasource.hive_sync.database=default \
     --hoodie-conf hoodie.datasource.hive_sync.table=employee \
     --hoodie-conf 
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
 \
     --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
 \
     --hoodie-conf hoodie.datasource.hive_sync.partition_fields= \
     --hoodie-conf hoodie.datasource.write.partitionpath.field= \
     --hoodie-conf hoodie.deltastreamer.csv.sep=, \
     --hoodie-conf hoodie.deltastreamer.csv.header=false \
     --hoodie-conf 
hoodie.deltastreamer.schemaprovider.source.schema.file=s3a://my-bucket/csv-test/schema/source-flattened.avsc
 \
     --hoodie-conf 
hoodie.deltastreamer.schemaprovider.target.schema.file=s3a://my-bucket/csv-test/schema/source-flattened.avsc
 \
     --schemaprovider-class 
org.apache.hudi.utilities.schema.FilebasedSchemaProvider
   ```
   
   **Now I added same avro schema to apicurio registry and modified above 
command to following but its not working.**
   
   ```
   spark-submit  \
     --jars /usr/lib/spark/external/lib/spark-avro_2.12-3.3.0-amzn-0.jar \
     --master local --deploy-mode client \
     --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
/usr/lib/hudi/hudi-utilities-bundle_2.12-0.11.1-amzn-0.jar \
     --table-type COPY_ON_WRITE --op BULK_INSERT \
     --target-base-path 
s3://hudi-multistreamer-roobal/csv-test/synced-table/employee \
     --target-table employee  \
     --min-sync-interval-seconds 60 \
     --source-class org.apache.hudi.utilities.sources.CsvDFSSource \
     --source-ordering-field employeesid \
     --hoodie-conf 
hoodie.deltastreamer.source.dfs.root=s3a://my-bucket/csv-test/source-csv/ \
     --hoodie-conf hoodie.datasource.write.recordkey.field=employeesid \
     --enable-hive-sync \
     --hoodie-conf hoodie.datasource.hive_sync.database=default \
     --hoodie-conf hoodie.datasource.hive_sync.table=employee \
     --hoodie-conf 
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
 \
     --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
 \
     --hoodie-conf hoodie.datasource.hive_sync.partition_fields= \
     --hoodie-conf hoodie.datasource.write.partitionpath.field= \
     --hoodie-conf hoodie.deltastreamer.csv.sep=, \
     --hoodie-conf hoodie.deltastreamer.csv.header=false \
     --hoodie-conf 
schema.registry.url=http://xx.xxx.xx.xxx:8080/apis/ccompat/v6 \
     --hoodie-conf 
hoodie.deltastreamer.schemaprovider.registry.url=http://xx.xxx.xx.xxx:8080/apis/ccompat/v6/subjects/cd2c9220-63bb-468c-9051-57f6abf795da/versions/latest
 \
     --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider
   ```
   
   **Expected behavior**
   
   Since csv data file is same, FilebasedSchemaProvider  is working but 
SchemaRegistryProvider is not. It should work as data and schema  both are same 
in both cases, just providers are different. Let me know if I am missing any 
configuration?
   
   
   **Environment Description**
   Using AWS EMR-6.8.0 cluster
   
   * Hudi version : 0.11.1
   
   * Spark version :3.3.0
   
   * Hive version :3.1.3
   
   * Hadoop version : 3.2.1
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : No, Using AWS EMR cluster
   
   
   **Stacktrace**
   
   ```22/10/26 05:19:35 DEBUG ChannelEndPoint: changeInterests p=false 0->1 for 
SocketChannelEndPoint@5402de4a{l=/xx.xxx.xx.xxx:36061,r=/xx.xxx.xx.xxx:56288,OPEN,fill=FI,flush=-,to=0/30000}{io=0/1,kio=0,kro=1}->HttpConnection@32de932e[p=HttpParser{s=START,0
 of 
-1},g=HttpGenerator@1ad2f315{s=START}]=>HttpChannelOverHttp@2e0e0711{s=HttpChannelState@60192b1{s=IDLE
 rs=BLOCKING os=OPEN is=IDLE awp=false se=false i=true 
al=0},r=2,c=false/false,a=IDLE,uri=null,age=0}
   22/10/26 05:19:35 DEBUG BatchedMarkerCreationRunnable: Finish batch 
processing of create marker requests in 37 ms
   22/10/26 05:19:35 DEBUG ChannelEndPoint: Key interests updated 0 -> 1 on 
SocketChannelEndPoint@5402de4a{l=/xx.xxx.xx.xxx:36061,r=/xx.xxx.xx.xxx:56288,OPEN,fill=FI,flush=-,to=0/30000}{io=1/1,kio=1,kro=1}->HttpConnection@32de932e[p=HttpParser{s=START,0
 of 
-1},g=HttpGenerator@1ad2f315{s=START}]=>HttpChannelOverHttp@2e0e0711{s=HttpChannelState@60192b1{s=IDLE
 rs=BLOCKING os=OPEN is=IDLE awp=false se=false i=true 
al=0},r=2,c=false/false,a=IDLE,uri=null,age=0}
   22/10/26 05:19:35 INFO HoodieCreateHandle: New CreateHandle for partition : 
with fileId afd625e0-d213-4f0f-b3e5-bb3410bb97f8-0
   22/10/26 05:19:35 ERROR HoodieWriteHandle: Error writing record 
HoodieRecord{key=HoodieKey { recordKey=1111 partitionPath=}, 
currentLocation='null', newLocation='null'}
   java.io.EOFException: null
           at 
org.apache.avro.io.BinaryDecoder$ByteArrayByteSource.readRaw(BinaryDecoder.java:999)
 ~[avro-1.11.0.jar:1.11.0]
           at 
org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:405) 
~[avro-1.11.0.jar:1.11.0]
           at 
org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:313) 
~[avro-1.11.0.jar:1.11.0]
           at 
org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208) 
~[avro-1.11.0.jar:1.11.0]
           at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:470)
 ~[avro-1.11.0.jar:1.11.0]
           at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:460)
 ~[avro-1.11.0.jar:1.11.0]
           at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:192)
 ~[avro-1.11.0.jar:1.11.0]
           at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) 
~[avro-1.11.0.jar:1.11.0]
           at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
 ~[avro-1.11.0.jar:1.11.0]
           at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
 ~[avro-1.11.0.jar:1.11.0]
           at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
 ~[avro-1.11.0.jar:1.11.0]
           at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) 
~[avro-1.11.0.jar:1.11.0]
           at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) 
~[avro-1.11.0.jar:1.11.0]
           at 
org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:156) 
~[hudi-utilities-bundle_2.12-0.11.1-amzn-0.jar:0.11.1-amzn-0]
           at 
org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:146) 
~[hudi-utilities-bundle_2.12-0.11.1-amzn-0.jar:0.11.1-amzn-0]
           at 
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:75)
 ~[hudi-utilities-bundle_2.12-0.11.1-amzn-0.jar:0.11.1-amzn-0]
           at 
org.apache.hudi.common.model.HoodieRecordPayload.getInsertValue(HoodieRecordPayload.java:105)
 ~[hudi-utilities-bundle_2.12-0.11.1-amzn-0.jar:0.11.1-amzn-0]
           at 
org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:90)
 ~[hudi-utilities-bundle_2.12-0.11.1-amzn-0.jar:0.11.1-amzn-0]
           at 
org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:103)
 ~[hudi-utilities-bundle_2.12-0.11.1-amzn-0.jar:0.11.1-amzn-0]
           at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190)
 ~[hudi-utilities-bundle_2.12-0.11.1-amzn-0.jar:0.11.1-amzn-0]
           at 
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
 ~[hudi-utilities-bundle_2.12-0.11.1-amzn-0.jar:0.11.1-amzn-0]
           at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
 ~[hudi-utilities-bundle_2.12-0.11.1-amzn-0.jar:0.11.1-amzn-0]
           at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_342]
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_342]
           at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_342]
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_342]
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_342]
           at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342]
   22/10/26 05:19:35 INFO HoodieCreateHandle: Closing the file 
afd625e0-d213-4f0f-b3e5-bb3410bb97f8-0 as we are done with all the records 0
   22/10/26 05:19:35 INFO MultipartUploadOutputStream: close closed:false 
s3://hudi-multistreamer-roobal/csv-test/synced-table/employee/afd625e0-d213-4f0f-b3e5-bb3410bb97f8-0_1-17-16_20221026051925630.parquet
   22/10/26 05:19:36 INFO HoodieCreateHandle: CreateHandle for partitionPath  
fileID afd625e0-d213-4f0f-b3e5-bb3410bb97f8-0, took 284 ms.
   22/10/26 05:19:36 INFO BoundedInMemoryExecutor: Queue Consumption is done; 
notifying producer threads
   22/10/26 05:19:36 INFO MemoryStore: Block rdd_33_1 stored as values in 
memory (estimated size 1976.0 B, free 911.2 MiB)
   22/10/26 05:19:36 INFO BlockManagerInfo: Added rdd_33_1 in memory on 
ip-10-151-46-141.us-west-2.compute.internal:43387 (size: 1976.0 B, free: 912.1 
MiB)
   22/10/26 05:19:36 INFO Executor: Finished task 1.0 in stage 17.0 (TID 16). 
1240 bytes result sent to driver
   22/10/26 05:19:36 INFO TaskSetManager: Finished task 1.0 in stage 17.0 (TID 
16) in 336 ms on ip-10-151-46-141.us-west-2.compute.internal (executor driver) 
(2/2)```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to