Magicbeanbuyer edited a comment on issue #2498:
URL: https://github.com/apache/hudi/issues/2498#issuecomment-775872798


   Hey @vinothchandar,
   
   we've come across the same issue with reading MERGE_ON_READ table using 
spark. We consume data from our AWS MSK topic, write the data using 
`deltastreamer` on AWS EMR, and store the data in an S3 bucket.
   
   Following is our implementation.
   
   ### Write data
   
   ```
   spark-submit \
     --jars /usr/lib/hudi/hudi-utilities-bundle_2.12-0.7.0.jar \
     --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
     --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"\
     --conf "spark.sql.hive.convertMetastoreParquet=false" \
     /usr/lib/hudi/hudi-utilities-bundle_2.12-0.7.0.jar \
     --spark-master yarn \
     --schemaprovider-class 
org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
     --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
     --table-type MERGE_ON_READ \
     --source-ordering-field id \
     --target-base-path $target_base_path \
     --target-table $target_table \
     --hoodie-conf 
"hoodie.deltastreamer.schemaprovider.source.schema.file=$schema_file_path" \
     --hoodie-conf 
"hoodie.deltastreamer.schemaprovider.target.schema.file=$schema_file_path" \
     --hoodie-conf 
"hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator"
 \
     --hoodie-conf "hoodie.datasource.write.recordkey.field=id" \
     --hoodie-conf 
"hoodie.datasource.write.partitionpath.field=partitiontime:TIMESTAMP" \
     --hoodie-conf 
"hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd'T'HH:mm:ss.SSSZ"
 \
     --hoodie-conf "hoodie.datasource.write.hive_style_partitioning=true" \
     --hoodie-conf 
"hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy" \
     --hoodie-conf 
"hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING" \
     --hoodie-conf 
"hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit=milliseconds" 
\
     --hoodie-conf "hoodie.deltastreamer.keygen.timebased.timezone=UTC" \
     --hoodie-conf "hoodie.deltastreamer.source.kafka.topic=$kafka_topic" \
     --hoodie-conf "bootstrap.servers=$kafka_bootstrap_servers" \
     --hoodie-conf "auto.offset.reset=earliest"
   ```
   The hoodie table is generated in our S3 bucket no problem. However, Error 
message was thrown when we try to read it using either `python` or `scala`.
   
   ### Read Data
   #### Scala
   ```
   spark-shell \
     --packages 
org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.1
 \
     --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
     --conf "spark.sql.hive.convertMetastoreParquet=false"
   ```
   Trying to load data
   ```
   val basePath="s3://path/to/base/table"
   val df = spark.read.format("hudi").load(basePath + "/*/*/*/*")
   ```
   Error message 
   ```
   java.lang.NoSuchMethodError: 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(Lorg/apache/spark/sql/SparkSession;Lscala/collection/Seq;Lscala/collection/immutable/Map;Lscala/Option;Lorg/apache/spark/sql/execution/datasources/FileStatusCache;)V
     at 
org.apache.hudi.HoodieSparkUtils$.createInMemoryFileIndex(HoodieSparkUtils.scala:89)
     at 
org.apache.hudi.MergeOnReadSnapshotRelation.buildFileIndex(MergeOnReadSnapshotRelation.scala:127)
     at 
org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:72)
     at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:89)
     at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53)
     at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344)
     at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
     at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
     at scala.Option.getOrElse(Option.scala:189)
     at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
     at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:232)
     ... 47 elided
   ```
   
   #### Python
   ```
   pyspark \
     --packages 
org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.1
 \
     --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
     --conf "spark.sql.hive.convertMetastoreParquet=false"
   ```
   Trying to load data
   ```
   basePath="s3://path/to/base/table"
   df = spark.read.format("hudi").load(basePath + "/*/*/*/*")
   ```
   Error message 
   ```
   Traceback (most recent call last):
     File "<stdin>", line 1, in <module>
     File "/usr/lib/spark/python/pyspark/sql/readwriter.py", line 178, in load
       return self._df(self._jreader.load(path))
     File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", 
line 1305, in __call__
     File "/usr/lib/spark/python/pyspark/sql/utils.py", line 128, in deco
       return f(*a, **kw)
     File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", 
line 328, in get_return_value
   py4j.protocol.Py4JJavaError: An error occurred while calling o79.load.
   : java.lang.NoSuchMethodError: 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(Lorg/apache/spark/sql/SparkSession;Lscala/collection/Seq;Lscala/collection/immutable/Map;Lscala/Option;Lorg/apache/spark/sql/execution/datasources/FileStatusCache;)V
           at 
org.apache.hudi.HoodieSparkUtils$.createInMemoryFileIndex(HoodieSparkUtils.scala:89)
           at 
org.apache.hudi.MergeOnReadSnapshotRelation.buildFileIndex(MergeOnReadSnapshotRelation.scala:127)
           at 
org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:72)
           at 
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:89)
           at 
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53)
           at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344)
           at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
           at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
           at scala.Option.getOrElse(Option.scala:189)
           at 
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
           at 
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:232)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
           at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
           at py4j.Gateway.invoke(Gateway.java:282)
           at 
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
           at py4j.commands.CallCommand.execute(CallCommand.java:79)
           at py4j.GatewayConnection.run(GatewayConnection.java:238)
           at java.lang.Thread.run(Thread.java:748)
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to