dataproblems commented on issue #12068:
URL: https://github.com/apache/hudi/issues/12068#issuecomment-2414730327
Hi @rangareddy - Sure. I will also try that with the code you attached.
However, I made a change to it since I'm using ComplexKeyGenerator with
multiple fields in the record key.
Here's how I start my spark-shell
For EMR 6.11.0:
```
sudo spark-shell --packages
org.apache.hudi:hudi-spark3.3-bundle_2.12:1.0.0-beta2,org.apache.hudi:hudi-aws:1.0.0-beta2
--conf
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
--conf
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension
--conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar
```
For EMR 6.15.0
```
sudo spark-shell --packages
org.apache.hudi:hudi-spark3.4-bundle_2.12:1.0.0-beta2,org.apache.hudi:hudi-aws:1.0.0-beta2
--conf
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
--conf
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension
--conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar
```
Here's the full code I used:
```
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieIndexConfig
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
import spark.implicits._
import org.apache.hudi.keygen.ComplexKeyGenerator
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70
,"san_francisco"),
(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90
,"san_francisco"),
(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"
),
(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
var inserts = spark.createDataFrame(data).toDF(columns:_*)
val tableName = "trips_table"
val basePath = "s3://somebucket/path/trips_with_3.3"
val bulkWriteOptions: Map[String, String] = Map(
DataSourceWriteOptions.OPERATION.key() ->
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.TABLE_TYPE.key() ->
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key() -> "snappy",
HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key() -> "2147483648",
"hoodie.parquet.small.file.limit" -> "1073741824",
HoodieTableConfig.POPULATE_META_FIELDS.key() -> "false",
HoodieWriteConfig.BULK_INSERT_SORT_MODE.key() ->
BulkInsertSortMode.GLOBAL_SORT.name(),
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true",
HoodieIndexConfig.INDEX_TYPE.key() -> "RECORD_INDEX",
DataSourceWriteOptions.META_SYNC_ENABLED.key() -> "false",
"hoodie.metadata.record.index.enable" -> "true",
"hoodie.metadata.enable" -> "true",
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"hoodie.clustering.inline" -> "true",
"hoodie.clustering.plan.strategy.target.file.max.bytes" -> "2147483648",
"hoodie.clustering.plan.strategy.small.file.limit" -> "1073741824",
"hoodie.datasource.write.partitionpath.field" -> "city",
"hoodie.datasource.write.recordkey.field" -> "driver,rider",
"hoodie.datasource.write.precombine.field" -> "ts",
"hoodie.table.name" -> tableName,
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() ->
classOf[ComplexKeyGenerator].getName
)
inserts.write.format("hudi").
options(bulkWriteOptions).
mode(Overwrite).
save(basePath)
val tripsDF = spark.read.format("hudi").load(basePath)
tripsDF.show(false)
```
here's the logs with Spark 3.3. on EMR 6.11.0:
```
4/10/15 18:14:49 WARN SparkConf: The configuration key
'spark.yarn.driver.memoryOverhead' has been deprecated as of Spark 2.3 and may
be removed in the future. Please use the new key 'spark.driver.memoryOverhead'
instead.
java.lang.IllegalStateException
at
org.apache.hudi.common.util.ValidationUtils.checkState(ValidationUtils.java:62)
at
org.apache.hudi.HoodieBaseHadoopFsRelationFactory.recordKeyField$lzycompute(HoodieHadoopFsRelationFactory.scala:146)
at
org.apache.hudi.HoodieBaseHadoopFsRelationFactory.recordKeyField(HoodieHadoopFsRelationFactory.scala:141)
at
org.apache.hudi.HoodieMergeOnReadSnapshotHadoopFsRelationFactory.<init>(HoodieHadoopFsRelationFactory.scala:224)
at
org.apache.hudi.HoodieCopyOnWriteSnapshotHadoopFsRelationFactory.<init>(HoodieHadoopFsRelationFactory.scala:300)
at org.apache.hudi.DefaultSource$.createRelation(DefaultSource.scala:295)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:135)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:80)
at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350)
at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
at
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:185)
... 55 elided
```
Then I used EMR 6.15.0 with spark 3.4 and got the same exception:
```
ava.lang.IllegalStateException
at
org.apache.hudi.common.util.ValidationUtils.checkState(ValidationUtils.java:62)
at
org.apache.hudi.HoodieBaseHadoopFsRelationFactory.recordKeyField$lzycompute(HoodieHadoopFsRelationFactory.scala:146)
at
org.apache.hudi.HoodieBaseHadoopFsRelationFactory.recordKeyField(HoodieHadoopFsRelationFactory.scala:141)
at
org.apache.hudi.HoodieMergeOnReadSnapshotHadoopFsRelationFactory.<init>(HoodieHadoopFsRelationFactory.scala:224)
at
org.apache.hudi.HoodieCopyOnWriteSnapshotHadoopFsRelationFactory.<init>(HoodieHadoopFsRelationFactory.scala:300)
at org.apache.hudi.DefaultSource$.createRelation(DefaultSource.scala:295)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:135)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:80)
at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
at
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
... 47 elided
```
I hope this helps clarify on the issue that I'm facing in my environment.
However, I would also like to add that if I switch to using SimpleKeyGenerator
and make the record key as `uuid`, as you had. That does work.
Is your recommendation that we should use SimpleKeyGenerator and a single
field in the record key since there is a bug in the ComplexKeyGenerator /
record key with multiple fields?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]