dataproblems commented on issue #12068:
URL: https://github.com/apache/hudi/issues/12068#issuecomment-2414730327

   Hi @rangareddy - Sure. I will also try that with the code you attached. 
However, I made a change to it since I'm using ComplexKeyGenerator with 
multiple fields in the record key. 
   
   
   Here's how I start my spark-shell
   
   For EMR 6.11.0: 
   
   ```
   sudo spark-shell --packages 
org.apache.hudi:hudi-spark3.3-bundle_2.12:1.0.0-beta2,org.apache.hudi:hudi-aws:1.0.0-beta2
 --conf 
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog 
--conf 
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension 
--conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar
   ```
   
   For EMR 6.15.0
   
   ```
   sudo spark-shell --packages 
org.apache.hudi:hudi-spark3.4-bundle_2.12:1.0.0-beta2,org.apache.hudi:hudi-aws:1.0.0-beta2
 --conf 
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog 
--conf 
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension 
--conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar
   ```
   
   Here's the full code I used: 
   
   ```
   
   import scala.collection.JavaConversions._
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.common.table.HoodieTableConfig._
   import org.apache.hudi.config.HoodieWriteConfig
   import org.apache.hudi.config.HoodieWriteConfig._
   import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
   import org.apache.hudi.common.model.HoodieRecord
   import org.apache.hudi.common.table.HoodieTableConfig
   import org.apache.hudi.config.HoodieIndexConfig
   import org.apache.hudi.common.config.HoodieStorageConfig
   import org.apache.hudi.common.config.HoodieMetadataConfig
   import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
   import spark.implicits._
   import org.apache.hudi.keygen.ComplexKeyGenerator
   
   
   val columns = Seq("ts","uuid","rider","driver","fare","city")
   val data =
     
Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
       
(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70
 ,"san_francisco"),
       
(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90
 ,"san_francisco"),
       
(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"
    ),
       
(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
   
   var inserts = spark.createDataFrame(data).toDF(columns:_*)
   
   val tableName = "trips_table"
   val basePath = "s3://somebucket/path/trips_with_3.3"
   
   val bulkWriteOptions: Map[String, String] = Map(
     DataSourceWriteOptions.OPERATION.key() -> 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
     DataSourceWriteOptions.TABLE_TYPE.key() -> 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
     HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key() -> "snappy",
     HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key() -> "2147483648",
     "hoodie.parquet.small.file.limit" -> "1073741824",
     HoodieTableConfig.POPULATE_META_FIELDS.key() -> "false",
     HoodieWriteConfig.BULK_INSERT_SORT_MODE.key() -> 
BulkInsertSortMode.GLOBAL_SORT.name(),
     HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true",
     HoodieIndexConfig.INDEX_TYPE.key() -> "RECORD_INDEX",
     DataSourceWriteOptions.META_SYNC_ENABLED.key() -> "false",
     "hoodie.metadata.record.index.enable" -> "true",
     "hoodie.metadata.enable" -> "true",
     "hoodie.datasource.write.hive_style_partitioning" -> "true",
     "hoodie.clustering.inline" -> "true",
     "hoodie.clustering.plan.strategy.target.file.max.bytes" -> "2147483648",
     "hoodie.clustering.plan.strategy.small.file.limit" -> "1073741824",
     "hoodie.datasource.write.partitionpath.field" -> "city",
     "hoodie.datasource.write.recordkey.field" -> "driver,rider",
     "hoodie.datasource.write.precombine.field" -> "ts",
     "hoodie.table.name" -> tableName,
     DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> 
classOf[ComplexKeyGenerator].getName
   )
   
   inserts.write.format("hudi").
     options(bulkWriteOptions).
     mode(Overwrite).
     save(basePath)
   
   val tripsDF = spark.read.format("hudi").load(basePath)
   tripsDF.show(false)
   ```
   
   here's the logs with Spark 3.3. on EMR 6.11.0: 
   
   ```
   4/10/15 18:14:49 WARN SparkConf: The configuration key 
'spark.yarn.driver.memoryOverhead' has been deprecated as of Spark 2.3 and may 
be removed in the future. Please use the new key 'spark.driver.memoryOverhead' 
instead.
   java.lang.IllegalStateException
     at 
org.apache.hudi.common.util.ValidationUtils.checkState(ValidationUtils.java:62)
     at 
org.apache.hudi.HoodieBaseHadoopFsRelationFactory.recordKeyField$lzycompute(HoodieHadoopFsRelationFactory.scala:146)
     at 
org.apache.hudi.HoodieBaseHadoopFsRelationFactory.recordKeyField(HoodieHadoopFsRelationFactory.scala:141)
     at 
org.apache.hudi.HoodieMergeOnReadSnapshotHadoopFsRelationFactory.<init>(HoodieHadoopFsRelationFactory.scala:224)
     at 
org.apache.hudi.HoodieCopyOnWriteSnapshotHadoopFsRelationFactory.<init>(HoodieHadoopFsRelationFactory.scala:300)
     at org.apache.hudi.DefaultSource$.createRelation(DefaultSource.scala:295)
     at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:135)
     at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:80)
     at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350)
     at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
     at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
     at scala.Option.getOrElse(Option.scala:189)
     at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
     at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:185)
     ... 55 elided
   ```
   
   Then I used EMR 6.15.0 with spark 3.4  and got the same exception: 
   
   ```
   ava.lang.IllegalStateException
     at 
org.apache.hudi.common.util.ValidationUtils.checkState(ValidationUtils.java:62)
     at 
org.apache.hudi.HoodieBaseHadoopFsRelationFactory.recordKeyField$lzycompute(HoodieHadoopFsRelationFactory.scala:146)
     at 
org.apache.hudi.HoodieBaseHadoopFsRelationFactory.recordKeyField(HoodieHadoopFsRelationFactory.scala:141)
     at 
org.apache.hudi.HoodieMergeOnReadSnapshotHadoopFsRelationFactory.<init>(HoodieHadoopFsRelationFactory.scala:224)
     at 
org.apache.hudi.HoodieCopyOnWriteSnapshotHadoopFsRelationFactory.<init>(HoodieHadoopFsRelationFactory.scala:300)
     at org.apache.hudi.DefaultSource$.createRelation(DefaultSource.scala:295)
     at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:135)
     at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:80)
     at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
     at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
     at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
     at scala.Option.getOrElse(Option.scala:189)
     at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
     at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
     ... 47 elided
   ```
   
   I hope this helps clarify on the issue that I'm facing in my environment. 
However, I would also like to add that if I switch to using SimpleKeyGenerator 
and make the record key as `uuid`, as you had. That does work. 
   
   Is your recommendation that we should use SimpleKeyGenerator and a single 
field in the record key since there is a bug in the ComplexKeyGenerator / 
record key with multiple fields? 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to