li36909 commented on pull request #2751:
URL: https://github.com/apache/hudi/pull/2751#issuecomment-812271790
@nsivabalan I run the test at hudi 0.7. yes, you are right, I start a
spark-shell for upserting, and query the same table by spark datasouce api,
then the problem arises. The cause of the problem is clear, during the query,
hudi get partitions at MergeOnReadSnapshotRelation, and build a new fsview at
HoodieRealtimeInputFormatUtils.groupLogsByBaseFile, when a write operation is
happening, HoodieRealtimeInputFormatUtils.groupLogsByBaseFile will find some
new base files.
we can reproduce this issue by add a Thread.sleep(6) at
HoodieRealtimeInputFormatUtils.groupLogsByBaseFile, then run this test:
step 1: write first batch into a hudi table:
spark-shell
import org.apache.hudi.QuickstartUtils.
import scala.collection.JavaConversions.
import org.apache.spark.sql.SaveMode.
import org.apache.hudi.DataSourceReadOptions.
import org.apache.hudi.DataSourceWriteOptions.
import org.apache.hudi.config.HoodieWriteConfig.
val tableName = "hudi_mor_table"
val basePath = "hdfs://hacluster/tmp/hudi_mor_table"
val dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("org.apache.hudi").
option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
option("hoodie.datasource.write.operation", "bulk_insert").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option("hoodie.datasource.write.hive_style_partitioning", "false").
option(TABLE_NAME, tableName).
option("hoodie.datasource.hive_sync.enable", "true").
option("hoodie.datasource.hive_sync.use_jdbc", "false").
option("hoodie.datasource.hive_sync.table", "hudi_mor_test").
option("hoodie.datasource.hive_sync.partition_extractor_class",
"org.apache.hudi.hive.MultiPartKeysValueExtractor").
option("hoodie.datasource.hive_sync.partition_fields",
"continent,country,city").
option("hoodie.datasource.hive_sync.assume_date_partitioning", "false").
option("hoodie.insert.shuffle.parallelism", "2").
option("hoodie.upsert.shuffle.parallelism","2").
option("hoodie.bulkinsert.shuffle.parallelism", "2").
option("hoodie.delete.shuffle.parallelism","2").
mode(Append).
save(basePath);
step 2: run a query at new spark-shell (when the query hang at Thread.sleep,
start to write a new batch at step3)
spark.read.format("hudi").load("hdfs://hacluster/tmp/hudi_mor_table/*/*/*/*").count
setp 3: go to the spark-shell at step1, write a new batch:
df.write.format("org.apache.hudi").
option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
option("hoodie.datasource.write.operation", "bulk_insert").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option("hoodie.datasource.write.hive_style_partitioning", "false").
option(TABLE_NAME, tableName).
option("hoodie.datasource.hive_sync.enable", "true").
option("hoodie.datasource.hive_sync.use_jdbc", "false").
option("hoodie.datasource.hive_sync.table", "hudi_mor_test").
option("hoodie.datasource.hive_sync.partition_extractor_class",
"org.apache.hudi.hive.MultiPartKeysValueExtractor").
option("hoodie.datasource.hive_sync.partition_fields",
"continent,country,city").
option("hoodie.datasource.hive_sync.assume_date_partitioning", "false").
option("hoodie.insert.shuffle.parallelism", "2").
option("hoodie.upsert.shuffle.parallelism","2").
option("hoodie.bulkinsert.shuffle.parallelism", "2").
option("hoodie.delete.shuffle.parallelism","2").
mode(Append).
save(basePath);
we can see the step2 will throw a exception
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org