(hudi) branch master updated: [HUDI-7378] Fix Spark SQL DML with custom key generator (#10615)
This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git The following commit(s) were added to refs/heads/master by this push: new 17ea14ab6d6 [HUDI-7378] Fix Spark SQL DML with custom key generator (#10615) 17ea14ab6d6 is described below commit 17ea14ab6d6a8ca7ecef2cfcdbc67b0c87f23987 Author: Y Ethan Guo AuthorDate: Fri Apr 12 22:51:03 2024 -0700 [HUDI-7378] Fix Spark SQL DML with custom key generator (#10615) --- .../factory/HoodieSparkKeyGeneratorFactory.java| 4 + .../org/apache/hudi/util/SparkKeyGenUtils.scala| 16 +- .../scala/org/apache/hudi/HoodieWriterUtils.scala | 20 +- .../spark/sql/hudi/ProvidesHoodieConfig.scala | 60 ++- .../spark/sql/hudi/TestProvidesHoodieConfig.scala | 79 +++ .../hudi/command/MergeIntoHoodieTableCommand.scala | 5 +- .../TestSparkSqlWithCustomKeyGenerator.scala | 571 + 7 files changed, 742 insertions(+), 13 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java index 1ea5adcd6b4..dcc2eaec9eb 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java @@ -79,6 +79,10 @@ public class HoodieSparkKeyGeneratorFactory { public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOException { String keyGeneratorClass = getKeyGeneratorClassName(props); +return createKeyGenerator(keyGeneratorClass, props); + } + + public static KeyGenerator createKeyGenerator(String keyGeneratorClass, TypedProperties props) throws IOException { boolean autoRecordKeyGen = KeyGenUtils.isAutoGeneratedRecordKeysEnabled(props) //Need to prevent overwriting the keygen for spark sql merge into because we need to extract //the recordkey from the meta cols if it exists. Sql keygen will use pkless keygen if needed. diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala index 7b91ae5a728..bd094464096 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala @@ -21,8 +21,8 @@ import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.util.StringUtils import org.apache.hudi.common.util.ValidationUtils.checkArgument import org.apache.hudi.keygen.constant.KeyGeneratorOptions -import org.apache.hudi.keygen.{AutoRecordKeyGeneratorWrapper, AutoRecordGenWrapperKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, GlobalAvroDeleteKeyGenerator, GlobalDeleteKeyGenerator, KeyGenerator, NonpartitionedAvroKeyGenerator, NonpartitionedKeyGenerator} import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory +import org.apache.hudi.keygen.{AutoRecordKeyGeneratorWrapper, CustomAvroKeyGenerator, CustomKeyGenerator, GlobalAvroDeleteKeyGenerator, GlobalDeleteKeyGenerator, KeyGenerator, NonpartitionedAvroKeyGenerator, NonpartitionedKeyGenerator} object SparkKeyGenUtils { @@ -35,6 +35,20 @@ object SparkKeyGenUtils { getPartitionColumns(keyGenerator, props) } + /** + * @param KeyGenClassNameOption key generator class name if present. + * @param props config properties. + * @return partition column names only, concatenated by "," + */ + def getPartitionColumns(KeyGenClassNameOption: Option[String], props: TypedProperties): String = { +val keyGenerator = if (KeyGenClassNameOption.isEmpty) { + HoodieSparkKeyGeneratorFactory.createKeyGenerator(props) +} else { + HoodieSparkKeyGeneratorFactory.createKeyGenerator(KeyGenClassNameOption.get, props) +} +getPartitionColumns(keyGenerator, props) + } + /** * @param keyGen key generator class name * @return partition columns diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index 63495b0eede..5df773542d6 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -201,8 +201,26 @@ object HoodieWriterUtils { diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n") } +// Please note that the validation of partition path fields
Re: [PR] [HUDI-7378] Fix Spark SQL DML with custom key generator [hudi]
yihua merged PR #10615: URL: https://github.com/apache/hudi/pull/10615 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT]After compacting, there are a large number of logs with size 0, and they can never be cleared. [hudi]
MrAladdin commented on issue #11007: URL: https://github.com/apache/hudi/issues/11007#issuecomment-205291 > rollback the compaction I'm not sure which compact to roll back and how to locate it since it has been compacted multiple times already. If it's not addressed, will it be automatically cleared later? Is there any specific documentation on this issue? I'd like to quickly understand its principle further. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT]There is a deltacommit that remains in the REQUESTED state [hudi]
MrAladdin commented on issue #11010: URL: https://github.com/apache/hudi/issues/11010#issuecomment-2052898550 > You can trigger revert with Hudi CLI. 您可以使用 Hudi CLI 触发还原。 Please, how can I restart, can you give me a specific command example? Also, I would like to ask why serial deltacommits would occur in this situation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]
yihua commented on code in PR #10956: URL: https://github.com/apache/hudi/pull/10956#discussion_r1563527600 ## hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParquetSchemaEvolutionUtils.scala: ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hudi.client.utils.SparkInternalSchemaConverter +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.util +import org.apache.hudi.common.util.InternalSchemaCache +import org.apache.hudi.common.util.StringUtils.isNullOrEmpty +import org.apache.hudi.common.util.collection.Pair +import org.apache.hudi.internal.schema.InternalSchema +import org.apache.hudi.internal.schema.action.InternalSchemaMerger +import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} +import org.apache.parquet.hadoop.metadata.FileMetaData +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, UnsafeProjection} +import org.apache.spark.sql.execution.datasources.Spark3ParquetSchemaEvolutionUtils.pruneInternalSchema +import org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormatHelper, ParquetReadSupport} +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType} + +import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` + +abstract class Spark3ParquetSchemaEvolutionUtils(sharedConf: Configuration, + filePath: Path, + requiredSchema: StructType, + partitionSchema: StructType) { + // Fetch internal schema + private lazy val internalSchemaStr: String = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA) + + private lazy val querySchemaOption: util.Option[InternalSchema] = pruneInternalSchema(internalSchemaStr, requiredSchema) + + var shouldUseInternalSchema: Boolean = !isNullOrEmpty(internalSchemaStr) && querySchemaOption.isPresent + + private lazy val tablePath: String = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH) + private lazy val fileSchema: InternalSchema = if (shouldUseInternalSchema) { +val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong; +val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST) +InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits) + } else { +null + } + + def rebuildFilterFromParquet(filter: Filter): Filter = { +rebuildFilterFromParquetHelper(filter, fileSchema, querySchemaOption.orElse(null)) + } + + private def rebuildFilterFromParquetHelper(oldFilter: Filter, fileSchema: InternalSchema, querySchema: InternalSchema): Filter = { +if (fileSchema == null || querySchema == null) { + oldFilter +} else { + oldFilter match { +case eq: EqualTo => + val newAttribute = InternalSchemaUtils.reBuildFilterName(eq.attribute, fileSchema, querySchema) + if (newAttribute.isEmpty) AlwaysTrue else eq.copy(attribute = newAttribute) +case eqs: EqualNullSafe => + val newAttribute = InternalSchemaUtils.reBuildFilterName(eqs.attribute, fileSchema, querySchema) + if (newAttribute.isEmpty) AlwaysTrue else eqs.copy(attribute = newAttribute) +case gt: GreaterThan => + val newAttribute = InternalSchemaUtils.reBuildFilterName(gt.attribute, fileSchema, querySchema) + if (newAttribute.isEmpty) AlwaysTrue else gt.copy(attribute = newAttribute) +case gtr: GreaterThanOrEqual => + val newAttribute = InternalSchemaUtils.reBuildFilterName(gtr.attribute, fileSchema, querySchema) + if (newAttribute.isEmpty) AlwaysTrue else gtr.copy(attribute = newAttribute) +case lt: LessThan
Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]
yihua commented on code in PR #10956: URL: https://github.com/apache/hudi/pull/10956#discussion_r1563514428 ## hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParquetSchemaEvolutionUtils.scala: ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hudi.client.utils.SparkInternalSchemaConverter +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.util +import org.apache.hudi.common.util.InternalSchemaCache +import org.apache.hudi.common.util.StringUtils.isNullOrEmpty +import org.apache.hudi.common.util.collection.Pair +import org.apache.hudi.internal.schema.InternalSchema +import org.apache.hudi.internal.schema.action.InternalSchemaMerger +import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} +import org.apache.parquet.hadoop.metadata.FileMetaData +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, UnsafeProjection} +import org.apache.spark.sql.execution.datasources.Spark3ParquetSchemaEvolutionUtils.pruneInternalSchema +import org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormatHelper, ParquetReadSupport} +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType} + +import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` + +abstract class Spark3ParquetSchemaEvolutionUtils(sharedConf: Configuration, + filePath: Path, + requiredSchema: StructType, + partitionSchema: StructType) { + // Fetch internal schema + private lazy val internalSchemaStr: String = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA) + + private lazy val querySchemaOption: util.Option[InternalSchema] = pruneInternalSchema(internalSchemaStr, requiredSchema) + + var shouldUseInternalSchema: Boolean = !isNullOrEmpty(internalSchemaStr) && querySchemaOption.isPresent + + private lazy val tablePath: String = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH) + private lazy val fileSchema: InternalSchema = if (shouldUseInternalSchema) { +val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong; +val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST) +InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits) + } else { +null + } + + def rebuildFilterFromParquet(filter: Filter): Filter = { +rebuildFilterFromParquetHelper(filter, fileSchema, querySchemaOption.orElse(null)) + } + + private def rebuildFilterFromParquetHelper(oldFilter: Filter, fileSchema: InternalSchema, querySchema: InternalSchema): Filter = { +if (fileSchema == null || querySchema == null) { + oldFilter +} else { + oldFilter match { +case eq: EqualTo => + val newAttribute = InternalSchemaUtils.reBuildFilterName(eq.attribute, fileSchema, querySchema) + if (newAttribute.isEmpty) AlwaysTrue else eq.copy(attribute = newAttribute) +case eqs: EqualNullSafe => + val newAttribute = InternalSchemaUtils.reBuildFilterName(eqs.attribute, fileSchema, querySchema) + if (newAttribute.isEmpty) AlwaysTrue else eqs.copy(attribute = newAttribute) +case gt: GreaterThan => + val newAttribute = InternalSchemaUtils.reBuildFilterName(gt.attribute, fileSchema, querySchema) + if (newAttribute.isEmpty) AlwaysTrue else gt.copy(attribute = newAttribute) +case gtr: GreaterThanOrEqual => + val newAttribute = InternalSchemaUtils.reBuildFilterName(gtr.attribute, fileSchema, querySchema) + if (newAttribute.isEmpty) AlwaysTrue else gtr.copy(attribute = newAttribute) +case lt: LessThan
Re: [PR] [HUDI-7378] Fix Spark SQL DML with custom key generator [hudi]
hudi-bot commented on PR #10615: URL: https://github.com/apache/hudi/pull/10615#issuecomment-2052767220 ## CI report: * 805ba35b65afbb1daccbcf00291fd520a69c5584 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23232) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]
yihua commented on code in PR #10956: URL: https://github.com/apache/hudi/pull/10956#discussion_r1563399841 ## hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala: ## @@ -142,11 +149,20 @@ class Spark30ParquetReader(enableVectorizedReader: Boolean, } val taskContext = Option(TaskContext.get()) if (enableVectorizedReader) { - val vectorizedReader = new VectorizedParquetRecordReader( -convertTz.orNull, -datetimeRebaseMode.toString, -enableOffHeapColumnVector && taskContext.isDefined, -capacity) + val vectorizedReader = if (schemaEvolutionUtils.shouldUseInternalSchema) { +schemaEvolutionUtils.buildVectorizedReader( + convertTz, + datetimeRebaseMode, + enableOffHeapColumnVector, + taskContext, + capacity) + } else { +new VectorizedParquetRecordReader( + convertTz.orNull, + datetimeRebaseMode.toString, + enableOffHeapColumnVector && taskContext.isDefined, + capacity) + } Review Comment: Fold the check `schemaEvolutionUtils.shouldUseInternalSchema` into `schemaEvolutionUtils.buildVectorizedReader` (returning new VectorizedParquetRecordReader inside if internal schema is disabled) and simplify the logic here as ```suggestion val vectorizedReader = schemaEvolutionUtils.buildVectorizedReader( convertTz, datetimeRebaseMode, enableOffHeapColumnVector, taskContext, capacity) ``` ## hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24ParquetReader.scala: ## @@ -141,8 +150,20 @@ class Spark24ParquetReader(enableVectorizedReader: Boolean, } val taskContext = Option(TaskContext.get()) if (enableVectorizedReader) { - val vectorizedReader = new VectorizedParquetRecordReader( -convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity) + val vectorizedReader = if (!implicitTypeChangeInfos.isEmpty) { Review Comment: Do we already have tests around schema evolution using the new spark file readers? ## hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala: ## @@ -142,11 +149,20 @@ class Spark30ParquetReader(enableVectorizedReader: Boolean, } val taskContext = Option(TaskContext.get()) if (enableVectorizedReader) { - val vectorizedReader = new VectorizedParquetRecordReader( -convertTz.orNull, -datetimeRebaseMode.toString, -enableOffHeapColumnVector && taskContext.isDefined, -capacity) + val vectorizedReader = if (schemaEvolutionUtils.shouldUseInternalSchema) { +schemaEvolutionUtils.buildVectorizedReader( + convertTz, + datetimeRebaseMode, + enableOffHeapColumnVector, + taskContext, + capacity) + } else { +new VectorizedParquetRecordReader( + convertTz.orNull, + datetimeRebaseMode.toString, + enableOffHeapColumnVector && taskContext.isDefined, + capacity) + } Review Comment: Same for readers for other Spark versions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [MINOR] Make ordering deterministic in small file selection [hudi]
hudi-bot commented on PR #11008: URL: https://github.com/apache/hudi/pull/11008#issuecomment-2052737016 ## CI report: * 08eee17c0e936c02e100b65aeba27f81a232452c Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23231) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [MINOR] Make ordering deterministic in small file selection [hudi]
the-other-tim-brown commented on code in PR #11008: URL: https://github.com/apache/hudi/pull/11008#discussion_r1563428947 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java: ## @@ -89,10 +89,13 @@ protected List getSmallFiles(String partitionPath) { private List getSmallFileCandidates(String partitionPath, HoodieInstant latestCommitInstant) { // If we can index log files, we can add more inserts to log files for fileIds NOT including those under // pending compaction +Comparator comparator = Comparator.comparing(fileSlice -> getTotalFileSize(fileSlice)) +.thenComparing(FileSlice::getFileId); if (table.getIndex().canIndexLogFiles()) { return table.getSliceView() .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitInstant.getTimestamp(), false) .filter(this::isSmallFile) + .sorted(comparator) .collect(Collectors.toList()); Review Comment: I think it makes sense to prefer the smallest files first as candidates to minimize IO -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [MINOR] Make ordering deterministic in small file selection [hudi]
danny0405 commented on code in PR #11008: URL: https://github.com/apache/hudi/pull/11008#discussion_r1563427282 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java: ## @@ -89,10 +89,13 @@ protected List getSmallFiles(String partitionPath) { private List getSmallFileCandidates(String partitionPath, HoodieInstant latestCommitInstant) { // If we can index log files, we can add more inserts to log files for fileIds NOT including those under // pending compaction +Comparator comparator = Comparator.comparing(fileSlice -> getTotalFileSize(fileSlice)) +.thenComparing(FileSlice::getFileId); if (table.getIndex().canIndexLogFiles()) { return table.getSliceView() .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitInstant.getTimestamp(), false) .filter(this::isSmallFile) + .sorted(comparator) .collect(Collectors.toList()); Review Comment: Should we just fix the tests? Do we have gains for the sort in production? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7609] Support array field type whose element type can be nullable [hudi]
danny0405 commented on code in PR #11006: URL: https://github.com/apache/hudi/pull/11006#discussion_r1563425831 ## hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/Parquet2SparkSchemaUtils.java: ## @@ -140,7 +141,7 @@ private static String convertGroupField(GroupType field) { ValidationUtils.checkArgument(field.getFieldCount() == 1, "Illegal List type: " + field); Type repeatedType = field.getType(0); if (isElementType(repeatedType, field.getName())) { - return arrayType(repeatedType, false); + return arrayType(repeatedType, true); Review Comment: Can we write a simple test for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT]There is a deltacommit that remains in the REQUESTED state [hudi]
danny0405 commented on issue #11010: URL: https://github.com/apache/hudi/issues/11010#issuecomment-2052727390 You can trigger revert with Hudi CLI. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7608] Fix Flink table creation configuration not taking effect when writing… [hudi]
danny0405 commented on code in PR #11005: URL: https://github.com/apache/hudi/pull/11005#discussion_r1563423256 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala: ## @@ -43,6 +43,11 @@ object HoodieOptionConfig { */ val SQL_VALUE_TABLE_TYPE_MOR = "mor" + /** + * The short name for the value of index type. + */ + val SQL_VALUE_INDEX_TYPE = "index.type" Review Comment: Maybe we can fix the options in hudi catalog. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT]After compacting, there are a large number of logs with size 0, and they can never be cleared. [hudi]
danny0405 commented on issue #11007: URL: https://github.com/apache/hudi/issues/11007#issuecomment-2052724244 You can rollback the compaction with CIL, the cleaner would finally clean these logs, because before 1.0, the log cleaning is actually appending new log blocks to the corrupt files, which does not really clean the file instantly, these files would finally clean with the specific cleaning strategies. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT] StreamWriteFunction support Exectly-Once in Flink ? [hudi]
danny0405 commented on issue #11004: URL: https://github.com/apache/hudi/issues/11004#issuecomment-2052723145 The checkpoint would trigger commit to hudi table. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] [SUPPORT]There is a deltacommit that remains in the REQUESTED state [hudi]
MrAladdin opened a new issue, #11010: URL: https://github.com/apache/hudi/issues/11010 **Describe the problem you faced** There is a deltacommit that remains in the REQUESTED state.Does it have an impact, will it cause data loss, and how to deal with it next? **Environment Description** * Hudi version :0.14.1 * Spark version :3.4.1 * Hive version :3.1.2 * Hadoop version :3.1.3 * Storage (HDFS/S3/GCS..) :hdfs * Running on Docker? (yes/no) :no **Additional context** spark structured streaming 、upsert、mor(record_index) .writeStream .format("hudi") .option("hoodie.table.base.file.format", "PARQUET") .option("hoodie.allow.empty.commit", "true") .option("hoodie.datasource.write.drop.partition.columns","false") .option("hoodie.table.services.enabled", "true") .option("hoodie.datasource.write.streaming.checkpoint.identifier", "lakehouse-dwd-social-kbi-beauty-v1-writer-1") .option(PRECOMBINE_FIELD.key(), "date_kbiUdate") .option(RECORDKEY_FIELD.key(), "records_key") .option(PARTITIONPATH_FIELD.key(), "partition_index_date") .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE.key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) .option("hoodie.combine.before.upsert", "true") .option("hoodie.datasource.write.payload.class","org.apache.hudi.common.model.OverwriteWithLatestAvroPayload") //markers .option("hoodie.write.markers.type", "DIRECT") //timeline server .option("hoodie.embed.timeline.server", "true") //File System View Storage Configurations .option("hoodie.filesystem.view.remote.timeout.secs", "1200") .option("hoodie.filesystem.view.remote.retry.enable", "true") .option("hoodie.filesystem.view.remote.retry.initial_interval_ms", "500") .option("hoodie.filesystem.view.remote.retry.max_numbers", "15") .option("hoodie.filesystem.view.remote.retry.max_interval_ms", "8000") //schema cache .option("hoodie.schema.cache.enable", "true") //spark write .option("hoodie.datasource.write.streaming.ignore.failed.batch", "false") .option("hoodie.datasource.write.streaming.retry.count", "6") .option("hoodie.datasource.write.streaming.retry.interval.ms", "3000") //metadata .option("hoodie.metadata.enable", "true") .option("hoodie.metadata.index.async", "false") .option("hoodie.metadata.index.check.timeout.seconds", "900") .option("hoodie.auto.adjust.lock.configs", "true") .option("hoodie.metadata.optimized.log.blocks.scan.enable", "true") .option("hoodie.metadata.index.column.stats.enable", "false") .option("hoodie.metadata.index.column.stats.parallelism", "100") .option("hoodie.metadata.index.column.stats.file.group.count", "4") .option("hoodie.metadata.index.column.stats.column.list","date_udate,date_publishedAt") .option("hoodie.metadata.compact.max.delta.commits", "10") //metadata .option("hoodie.metadata.record.index.enable", "true") .option("hoodie.index.type", "RECORD_INDEX") .option("hoodie.metadata.max.init.parallelism", "10") .option("hoodie.metadata.record.index.min.filegroup.count", "10") .option("hoodie.metadata.record.index.max.filegroup.count", "1") .option("hoodie.metadata.record.index.max.filegroup.size", "1073741824") .option("hoodie.metadata.auto.initialize", "true") .option("hoodie.metadata.record.index.growth.factor", "2.0") .option("hoodie.metadata.max.logfile.size", "2147483648") .option("hoodie.metadata.log.compaction.enable", "false") .option("hoodie.metadata.log.compaction.blocks.threshold", "5") .option("hoodie.metadata.max.deltacommits.when_pending", "1000") //file size .option("hoodie.parquet.field_id.write.enabled", "true") .option("hoodie.copyonwrite.insert.auto.split", "true") .option("hoodie.record.size.estimation.threshold", "1.0") .option("hoodie.parquet.block.size", "536870912") .option("hoodie.parquet.max.file.size", "536870912") .option("hoodie.parquet.small.file.limit", "314572800") .option("hoodie.logfile.max.size", "536870912") .option("hoodie.logfile.data.block.max.size", "536870912") .option("hoodie.logfile.to.parquet.compression.ratio", "0.35")
Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]
hudi-bot commented on PR #10956: URL: https://github.com/apache/hudi/pull/10956#issuecomment-2052706711 ## CI report: * c8f507bcac03c7183893400487a1885400c46853 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23230) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7378] Fix Spark SQL DML with custom key generator [hudi]
hudi-bot commented on PR #10615: URL: https://github.com/apache/hudi/pull/10615#issuecomment-2052703726 ## CI report: * dfab8e1285bf0241eea2e71f9d85607c647446d7 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23212) * 805ba35b65afbb1daccbcf00291fd520a69c5584 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23232) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7378] Fix Spark SQL DML with custom key generator [hudi]
hudi-bot commented on PR #10615: URL: https://github.com/apache/hudi/pull/10615#issuecomment-2052699814 ## CI report: * dfab8e1285bf0241eea2e71f9d85607c647446d7 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23212) * 805ba35b65afbb1daccbcf00291fd520a69c5584 UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (HUDI-7615) Mark a few write configs with the correct sinceVersion
[ https://issues.apache.org/jira/browse/HUDI-7615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ethan Guo updated HUDI-7615: Component/s: configs > Mark a few write configs with the correct sinceVersion > -- > > Key: HUDI-7615 > URL: https://issues.apache.org/jira/browse/HUDI-7615 > Project: Apache Hudi > Issue Type: Improvement > Components: configs >Reporter: Ethan Guo >Assignee: tao pan >Priority: Major > > The following write configs are not associated with the correct since version > (ConfigProperty#sinceVersion), which should be fixed. Correct version is > listed below: > hoodie.metadata.log.compaction.enable -> 0.14.0 > hoodie.log.compaction.enable -> 0.14.0 > hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled -> > 0.10.1 > hoodie.datasource.write.payload.type -> 1.0.0 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-7615) Mark a few write configs with the correct sinceVersion
[ https://issues.apache.org/jira/browse/HUDI-7615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ethan Guo updated HUDI-7615: Description: The following write configs are not associated with the correct since version (ConfigProperty#sinceVersion), which should be fixed. Correct version is listed below: hoodie.metadata.log.compaction.enable -> 0.14.0 hoodie.log.compaction.enable -> 0.14.0 hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled -> 0.10.1 hoodie.datasource.write.payload.type -> 1.0.0 > Mark a few write configs with the correct sinceVersion > -- > > Key: HUDI-7615 > URL: https://issues.apache.org/jira/browse/HUDI-7615 > Project: Apache Hudi > Issue Type: Improvement >Reporter: Ethan Guo >Assignee: tao pan >Priority: Major > > The following write configs are not associated with the correct since version > (ConfigProperty#sinceVersion), which should be fixed. Correct version is > listed below: > hoodie.metadata.log.compaction.enable -> 0.14.0 > hoodie.log.compaction.enable -> 0.14.0 > hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled -> > 0.10.1 > hoodie.datasource.write.payload.type -> 1.0.0 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-7615) Mark a few write configs with the correct sinceVersion
[ https://issues.apache.org/jira/browse/HUDI-7615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ethan Guo updated HUDI-7615: Fix Version/s: 0.15.0 1.0.0 > Mark a few write configs with the correct sinceVersion > -- > > Key: HUDI-7615 > URL: https://issues.apache.org/jira/browse/HUDI-7615 > Project: Apache Hudi > Issue Type: Improvement > Components: configs >Reporter: Ethan Guo >Assignee: tao pan >Priority: Major > Fix For: 0.15.0, 1.0.0 > > > The following write configs are not associated with the correct since version > (ConfigProperty#sinceVersion), which should be fixed. Correct version is > listed below: > hoodie.metadata.log.compaction.enable -> 0.14.0 > hoodie.log.compaction.enable -> 0.14.0 > hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled -> > 0.10.1 > hoodie.datasource.write.payload.type -> 1.0.0 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7615) Mark a few write configs with the correct sinceVersion
Ethan Guo created HUDI-7615: --- Summary: Mark a few write configs with the correct sinceVersion Key: HUDI-7615 URL: https://issues.apache.org/jira/browse/HUDI-7615 Project: Apache Hudi Issue Type: Improvement Reporter: Ethan Guo -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (HUDI-7615) Mark a few write configs with the correct sinceVersion
[ https://issues.apache.org/jira/browse/HUDI-7615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ethan Guo reassigned HUDI-7615: --- Assignee: tao pan (was: Ethan Guo) > Mark a few write configs with the correct sinceVersion > -- > > Key: HUDI-7615 > URL: https://issues.apache.org/jira/browse/HUDI-7615 > Project: Apache Hudi > Issue Type: Improvement >Reporter: Ethan Guo >Assignee: tao pan >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (HUDI-7615) Mark a few write configs with the correct sinceVersion
[ https://issues.apache.org/jira/browse/HUDI-7615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ethan Guo reassigned HUDI-7615: --- Assignee: Ethan Guo > Mark a few write configs with the correct sinceVersion > -- > > Key: HUDI-7615 > URL: https://issues.apache.org/jira/browse/HUDI-7615 > Project: Apache Hudi > Issue Type: Improvement >Reporter: Ethan Guo >Assignee: Ethan Guo >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [HUDI-7378] Fix Spark SQL DML with custom key generator [hudi]
yihua commented on code in PR #10615: URL: https://github.com/apache/hudi/pull/10615#discussion_r1563323590 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala: ## @@ -530,6 +539,40 @@ object ProvidesHoodieConfig { filterNullValues(overridingOpts) } + /** + * @param tableConfigKeyGeneratorClassName key generator class name in the table config. + * @param partitionFieldNamesWithoutKeyGenType partition field names without key generator types + * from the table config. + * @param catalogTable HoodieCatalogTable instance to fetch table properties. + * @return the write config value to set for "hoodie.datasource.write.partitionpath.field". + */ + def getPartitionPathFieldWriteConfig(tableConfigKeyGeneratorClassName: String, + partitionFieldNamesWithoutKeyGenType: String, + catalogTable: HoodieCatalogTable): String = { +if (StringUtils.isNullOrEmpty(tableConfigKeyGeneratorClassName)) { + partitionFieldNamesWithoutKeyGenType +} else { + val writeConfigPartitionField = catalogTable.catalogProperties.get(PARTITIONPATH_FIELD.key()) + val keyGenClass = ReflectionUtils.getClass(tableConfigKeyGeneratorClassName) + if (classOf[CustomKeyGenerator].equals(keyGenClass) Review Comment: The assumption is that these key generators should not be extended. We should keep it this way for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [MINOR] Make ordering deterministic in small file selection [hudi]
hudi-bot commented on PR #11008: URL: https://github.com/apache/hudi/pull/11008#issuecomment-2052662120 ## CI report: * baaff5d03b4199e0aa188492cfa8a5fe2908a47e Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23220) * 08eee17c0e936c02e100b65aeba27f81a232452c Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23231) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [MINOR] Make ordering deterministic in small file selection [hudi]
hudi-bot commented on PR #11008: URL: https://github.com/apache/hudi/pull/11008#issuecomment-2052654450 ## CI report: * baaff5d03b4199e0aa188492cfa8a5fe2908a47e Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23220) * 08eee17c0e936c02e100b65aeba27f81a232452c UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]
hudi-bot commented on PR #10956: URL: https://github.com/apache/hudi/pull/10956#issuecomment-2052654190 ## CI report: * a73f9559fc8626342b767085cf7a56f743a425fc Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23227) * c8f507bcac03c7183893400487a1885400c46853 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23230) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]
hudi-bot commented on PR #10956: URL: https://github.com/apache/hudi/pull/10956#issuecomment-2052647727 ## CI report: * a73f9559fc8626342b767085cf7a56f743a425fc Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23227) * c8f507bcac03c7183893400487a1885400c46853 UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7378] Fix Spark SQL DML with custom key generator [hudi]
yihua commented on code in PR #10615: URL: https://github.com/apache/hudi/pull/10615#discussion_r1563245298 ## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala: ## @@ -0,0 +1,571 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.HoodieSparkUtils +import org.apache.hudi.common.config.TypedProperties +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.util.StringUtils +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.functional.TestSparkSqlWithCustomKeyGenerator._ +import org.apache.hudi.util.SparkKeyGenUtils +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase +import org.joda.time.DateTime +import org.joda.time.format.DateTimeFormat +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} +import org.slf4j.LoggerFactory + +import java.io.IOException + +/** + * Tests Spark SQL DML with custom key generator and write configs. + */ +class TestSparkSqlWithCustomKeyGenerator extends HoodieSparkSqlTestBase { + private val LOG = LoggerFactory.getLogger(getClass) + + test("Test Spark SQL DML with custom key generator") { +withTempDir { tmp => + Seq( +Seq("COPY_ON_WRITE", "ts:timestamp,segment:simple", + "(ts=202401, segment='cat2')", "202401/cat2", + Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3", "202402/cat1", "202402/cat3", "202402/cat5"), + TS_FORMATTER_FUNC, + (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + "/" + segment), +Seq("MERGE_ON_READ", "segment:simple", + "(segment='cat3')", "cat3", + Seq("cat1", "cat2", "cat4", "cat5"), + TS_TO_STRING_FUNC, + (_: Integer, segment: String) => segment), +Seq("MERGE_ON_READ", "ts:timestamp", + "(ts=202312)", "202312", + Seq("202401", "202402"), + TS_FORMATTER_FUNC, + (ts: Integer, _: String) => TS_FORMATTER_FUNC.apply(ts)), +Seq("MERGE_ON_READ", "ts:timestamp,segment:simple", + "(ts=202401, segment='cat2')", "202401/cat2", + Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3", "202402/cat1", "202402/cat3", "202402/cat5"), + TS_FORMATTER_FUNC, + (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + "/" + segment) + ).foreach { testParams => +withTable(generateTableName) { tableName => + LOG.warn("Testing with parameters: " + testParams) + val tableType = testParams(0).asInstanceOf[String] + val writePartitionFields = testParams(1).asInstanceOf[String] + val dropPartitionStatement = testParams(2).asInstanceOf[String] + val droppedPartition = testParams(3).asInstanceOf[String] + val expectedPartitions = testParams(4).asInstanceOf[Seq[String]] + val tsGenFunc = testParams(5).asInstanceOf[Integer => String] + val partitionGenFunc = testParams(6).asInstanceOf[(Integer, String) => String] + val tablePath = tmp.getCanonicalPath + "/" + tableName + val timestampKeyGeneratorConfig = if (writePartitionFields.contains("timestamp")) { +TS_KEY_GEN_CONFIGS + } else { +Map[String, String]() + } + val timestampKeyGenProps = if (timestampKeyGeneratorConfig.nonEmpty) { +", " + timestampKeyGeneratorConfig.map(e => e._1 + " = '" + e._2 + "'").mkString(", ") + } else { +"" + } + + prepareTableWithKeyGenerator( +tableName, tablePath, tableType, +CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields, timestampKeyGeneratorConfig) + + // SQL CTAS with table properties containing key generator write configs + createTableWithSql(tableName, tablePath, +s"hoodie.datasource.write.partitionpath.field = '$writePartitionFields'" + timestampKeyGenProps) + + // Prepare source and test SQL INSERT INTO + val sourceTableName = tableName + "_source" +
Re: [PR] [HUDI-7269] Fallback to key based merge if positions are missing from log block [hudi]
hudi-bot commented on PR #10991: URL: https://github.com/apache/hudi/pull/10991#issuecomment-2052596559 ## CI report: * 2af03c004aef66248dae6283e9c2f1e63e062e75 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23229) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [DO NOT MERGE][HUDI-7567] Add schema evolution to the filegroup reader [hudi]
hudi-bot commented on PR #10957: URL: https://github.com/apache/hudi/pull/10957#issuecomment-2052596475 ## CI report: * 966e8c85f2afb0ffaf00e12d02eb41b41c68e0bc Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23228) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]
hudi-bot commented on PR #10956: URL: https://github.com/apache/hudi/pull/10956#issuecomment-2052596422 ## CI report: * a73f9559fc8626342b767085cf7a56f743a425fc Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23227) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (HUDI-7614) Run hudi-cli tests in Azure CI
[ https://issues.apache.org/jira/browse/HUDI-7614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ethan Guo reassigned HUDI-7614: --- Assignee: Shawn Chang > Run hudi-cli tests in Azure CI > -- > > Key: HUDI-7614 > URL: https://issues.apache.org/jira/browse/HUDI-7614 > Project: Apache Hudi > Issue Type: Improvement >Reporter: Ethan Guo >Assignee: Shawn Chang >Priority: Major > Fix For: 1.0.0 > > > Right now Azure CI does not run tests in hudi-cli module. Some tests in > hudi-cli module fail locally. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-7614) Run hudi-cli tests in Azure CI
[ https://issues.apache.org/jira/browse/HUDI-7614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ethan Guo updated HUDI-7614: Description: Right now Azure CI does not run tests in hudi-cli module. Some tests in hudi-cli module fail locally. > Run hudi-cli tests in Azure CI > -- > > Key: HUDI-7614 > URL: https://issues.apache.org/jira/browse/HUDI-7614 > Project: Apache Hudi > Issue Type: Improvement >Reporter: Ethan Guo >Priority: Major > Fix For: 1.0.0 > > > Right now Azure CI does not run tests in hudi-cli module. Some tests in > hudi-cli module fail locally. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-7614) Run hudi-cli tests in Azure CI
[ https://issues.apache.org/jira/browse/HUDI-7614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ethan Guo updated HUDI-7614: Epic Link: HUDI-4302 > Run hudi-cli tests in Azure CI > -- > > Key: HUDI-7614 > URL: https://issues.apache.org/jira/browse/HUDI-7614 > Project: Apache Hudi > Issue Type: Improvement >Reporter: Ethan Guo >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-7614) Run hudi-cli tests in Azure CI
[ https://issues.apache.org/jira/browse/HUDI-7614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ethan Guo updated HUDI-7614: Fix Version/s: 1.0.0 > Run hudi-cli tests in Azure CI > -- > > Key: HUDI-7614 > URL: https://issues.apache.org/jira/browse/HUDI-7614 > Project: Apache Hudi > Issue Type: Improvement >Reporter: Ethan Guo >Priority: Major > Fix For: 1.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7614) Run hudi-cli tests in Azure CI
Ethan Guo created HUDI-7614: --- Summary: Run hudi-cli tests in Azure CI Key: HUDI-7614 URL: https://issues.apache.org/jira/browse/HUDI-7614 Project: Apache Hudi Issue Type: Improvement Reporter: Ethan Guo -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [HUDI-7378] Fix Spark SQL DML with custom key generator [hudi]
yihua commented on code in PR #10615: URL: https://github.com/apache/hudi/pull/10615#discussion_r1563198254 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala: ## @@ -530,6 +539,40 @@ object ProvidesHoodieConfig { filterNullValues(overridingOpts) } + /** + * @param tableConfigKeyGeneratorClassName key generator class name in the table config. + * @param partitionFieldNamesWithoutKeyGenType partition field names without key generator types + * from the table config. + * @param catalogTable HoodieCatalogTable instance to fetch table properties. + * @return the write config value to set for "hoodie.datasource.write.partitionpath.field". + */ + def getPartitionPathFieldWriteConfig(tableConfigKeyGeneratorClassName: String, + partitionFieldNamesWithoutKeyGenType: String, + catalogTable: HoodieCatalogTable): String = { +if (StringUtils.isNullOrEmpty(tableConfigKeyGeneratorClassName)) { + partitionFieldNamesWithoutKeyGenType +} else { + val writeConfigPartitionField = catalogTable.catalogProperties.get(PARTITIONPATH_FIELD.key()) Review Comment: As an example, the table looks like this in Spark catalog: ``` spark-sql (default)> DESCRIBE TABLE formatted h0; 24/04/12 13:59:53 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException _hoodie_commit_time string _hoodie_commit_seqno string _hoodie_record_key string _hoodie_partition_path string _hoodie_file_namestring id int name string pricedecimal(5,1) ts int segment string # Partition Information # col_name data_type comment ts int segment string # Detailed Table Information Catalog spark_catalog Database default Tableh0 Ownerethan Created Time Fri Apr 12 13:58:05 PDT 2024 Last Access UNKNOWN Created By Spark 3.5.1 Type EXTERNAL Provider hudi Table Properties [hoodie.datasource.write.partitionpath.field=ts:timestamp,segment:simple, preCombineField=name, primaryKey=id, provider=hudi, type=cow] Location file:/private/var/folders/60/wk8qzx310fd32b2dp7mhzvdcgn/T/spark-4ac6fb47-e20b-4679-a668-e28238ec3e05/h0 Serde Library org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe InputFormat org.apache.hudi.hadoop.HoodieParquetInputFormat OutputFormat org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat Time taken: 1.694 seconds, Fetched 30 row(s) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7378] Fix Spark SQL DML with custom key generator [hudi]
yihua commented on code in PR #10615: URL: https://github.com/apache/hudi/pull/10615#discussion_r1563196323 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala: ## @@ -530,6 +539,40 @@ object ProvidesHoodieConfig { filterNullValues(overridingOpts) } + /** + * @param tableConfigKeyGeneratorClassName key generator class name in the table config. + * @param partitionFieldNamesWithoutKeyGenType partition field names without key generator types + * from the table config. + * @param catalogTable HoodieCatalogTable instance to fetch table properties. + * @return the write config value to set for "hoodie.datasource.write.partitionpath.field". + */ + def getPartitionPathFieldWriteConfig(tableConfigKeyGeneratorClassName: String, + partitionFieldNamesWithoutKeyGenType: String, + catalogTable: HoodieCatalogTable): String = { +if (StringUtils.isNullOrEmpty(tableConfigKeyGeneratorClassName)) { + partitionFieldNamesWithoutKeyGenType +} else { + val writeConfigPartitionField = catalogTable.catalogProperties.get(PARTITIONPATH_FIELD.key()) Review Comment: Yes, the table properties associated with `HoodieCatalogTable` are persisted across Spark sessions. The persisted partition field write config `hoodie.datasource.write.partitionpath.field` is a custom config outside Spark, which is used by Hudi logic only. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled
[ https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836740#comment-17836740 ] Jonathan Vexler edited comment on HUDI-7610 at 4/12/24 8:40 PM: retest delete where delete precombine is less than insert {code:java} @Test def showDeleteIsInconsistent(): Unit = { val merger = classOf[HoodieSparkRecordMerger].getName //val merger = classOf[HoodieAvroRecordMerger].getName //val useFGReader = "true" val useFGReader = "false" //val tableType = "COPY_ON_WRITE" val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "number") val data = Seq((10, "A", 1), (10, "B", 1)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(TABLE_TYPE.key(), tableType). option("hoodie.table.name", "test_table"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Overwrite). save(basePath) val updateData = Seq((12, "A", 2)) val updates = spark.createDataFrame(updateData).toDF(columns: _*) updates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val deleteData = Seq((9, "B", 2)) val deletes = spark.createDataFrame(deleteData).toDF(columns: _*) deletes.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "delete"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val updateData2 = Seq((11, "A", 3),(8, "B", 3)) val updates2 = spark.createDataFrame(updateData2).toDF(columns: _*) updates2.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val df = spark.read.format("hudi"). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), "false").load(basePath) val finalDf = df.select("ts", "key", "number") finalDf.show(100, false) finalDf.show(100, false) } {code} merger: Spark, useFGReader: false, tableType: cow merger: Spark, useFGReader: true, tableType: cow merger: Avro, useFGReader: true, tableType: cow merger: Avro, useFGReader: false, tableType: cow {code:java} +---+---+--+ |ts |key|number| +---+---+--+ |12 |A |2 | |8 |B |3 | +---+---+--+ {code} merger: Avro, useFGReader: false, tableType: mor {code:java} +---+---+--+ |ts |key|number| +---+---+--+ |12 |A |2 | |10 |B |1 | +---+---+--+ {code} merger: Avro, useFGReader: true, tableType: mor {code:java} +---+---+--+ |ts |key|number| +---+---+--+ |12 |A |2 | +---+---+--+ {code} merger: Spark, useFGReader: true, tableType: mor NPE merger: Spark, useFGReader: false, tableType: mor Caused by: org.apache.hudi.exception.HoodieException: Ordering value is null for was (Author: JIRAUSER295101): retest delete where delete precombine is less than insert {code:java} @Test def showDeleteIsInconsistent(): Unit = { val merger = classOf[HoodieSparkRecordMerger].getName //val merger = classOf[HoodieAvroRecordMerger].getName //val useFGReader = "true" val useFGReader = "false" //val tableType = "COPY_ON_WRITE" val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "number") val data = Seq((10, "A", 1), (10, "B", 1)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts").
[jira] [Comment Edited] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled
[ https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836738#comment-17836738 ] Jonathan Vexler edited comment on HUDI-7610 at 4/12/24 8:40 PM: retest because default payload changed in the last few days {code:java} @Test def showDeleteIsInconsistent(): Unit = { //val merger = classOf[HoodieSparkRecordMerger].getName val merger = classOf[HoodieAvroRecordMerger].getName val useFGReader = "true" //val useFGReader = "false" val tableType = "COPY_ON_WRITE" //val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "number") val data = Seq((10, "A", 1), (10, "B", 1)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(TABLE_TYPE.key(), tableType). option("hoodie.table.name", "test_table"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Overwrite). save(basePath) val updateData = Seq((9, "A", 2)) val updates = spark.createDataFrame(updateData).toDF(columns: _*) updates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val deleteData = Seq((11, "B", 2)) val deletes = spark.createDataFrame(deleteData).toDF(columns: _*) deletes.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "delete"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val updateData2 = Seq((11, "A", 3),(9, "B", 3)) val updates2 = spark.createDataFrame(updateData2).toDF(columns: _*) updates2.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val df = spark.read.format("hudi"). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), "false").load(basePath) val finalDf = df.select("ts", "key", "number") finalDf.show(100, false) finalDf.show(100, false) } {code} merger: Spark, useFGReader: false, tableType: cow merger: Spark, useFGReader: true, tableType: cow merger: Avro, useFGReader: true, tableType: cow merger: Avro, useFGReader: false, tableType: cow {code:java} +---+---+--+ |ts |key|number| +---+---+--+ |11 |A |3 | |9 |B |3 | +---+---+--+ {code} merger: Avro, useFGReader: false, tableType: mor {code:java} +---+---+--+ |ts |key|number| +---+---+--+ |11 |A |3 | |10 |B |1 | +---+---+--+ {code} merger: Avro, useFGReader: true, tableType: mor {code:java} +---+---+--+ |ts |key|number| +---+---+--+ |11 |A |3 | +---+---+--+ {code} merger: Spark, useFGReader: true, tableType: mor NPE merger: Spark, useFGReader: false, tableType: mor Caused by: org.apache.hudi.exception.HoodieException: Ordering value is null for record: null was (Author: JIRAUSER295101): retest because default payload changed in the last few days {code:java} @Test def showDeleteIsInconsistent(): Unit = { //val merger = classOf[HoodieSparkRecordMerger].getName val merger = classOf[HoodieAvroRecordMerger].getName val useFGReader = "true" //val useFGReader = "false" val tableType = "COPY_ON_WRITE" //val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "number") val data = Seq((10, "A", 1), (10, "B", 1)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key").
[jira] [Commented] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled
[ https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836740#comment-17836740 ] Jonathan Vexler commented on HUDI-7610: --- retest delete where delete precombine is less than insert {code:java} @Test def showDeleteIsInconsistent(): Unit = { val merger = classOf[HoodieSparkRecordMerger].getName //val merger = classOf[HoodieAvroRecordMerger].getName //val useFGReader = "true" val useFGReader = "false" //val tableType = "COPY_ON_WRITE" val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "number") val data = Seq((10, "A", 1), (10, "B", 1)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(TABLE_TYPE.key(), tableType). option("hoodie.table.name", "test_table"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Overwrite). save(basePath) val updateData = Seq((12, "A", 2)) val updates = spark.createDataFrame(updateData).toDF(columns: _*) updates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val deleteData = Seq((9, "B", 2)) val deletes = spark.createDataFrame(deleteData).toDF(columns: _*) deletes.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "delete"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val updateData2 = Seq((11, "A", 3),(8, "B", 3)) val updates2 = spark.createDataFrame(updateData2).toDF(columns: _*) updates2.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val df = spark.read.format("hudi"). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), "false").load(basePath) val finalDf = df.select("ts", "key", "number") finalDf.show(100, false) finalDf.show(100, false) } {code} merger: Spark, useFGReader: false, tableType: cow merger: Spark, useFGReader: true, tableType: cow merger: Avro, useFGReader: true, tableType: cow merger: Avro, useFGReader: false, tableType: cow {code:java} +---+---+--+ |ts |key|number| +---+---+--+ |12 |A |2 | |8 |B |3 | +---+---+--+ {code} merger: Avro, useFGReader: false, tableType: mor {code:java} +---+---+--+ |ts |key|number| +---+---+--+ |12 |A |2 | |10 |B |1 | +---+---+--+ {code} merger: Avro, useFGReader: true, tableType: mor {code:java} +---+---+--+ |ts |key|number| +---+---+--+ |12 |A |2 | +---+---+--+ {code} merger: Spark, useFGReader: true, tableType: mor NPE merger: Spark, useFGReader: false, tableType: mor Caused by: org.apache.hudi.exception.HoodieException: Ordering value is null for > Delete records are inconsistent depending on MOR/COW, Avro/Spark record > merger, new filegroup reader enabled/disabled > - > > Key: HUDI-7610 > URL: https://issues.apache.org/jira/browse/HUDI-7610 > Project: Apache Hudi > Issue Type: Bug > Components: reader-core >Reporter: Jonathan Vexler >Priority: Blocker > Fix For: 1.0.0 > > > Here is a test that can be run on master: > > {code:java} > @Test > def showDeleteIsInconsistent(): Unit = { > val merger = classOf[HoodieSparkRecordMerger].getName >
Re: [PR] Setup spark and timeline services once where possible [hudi]
hudi-bot commented on PR #11009: URL: https://github.com/apache/hudi/pull/11009#issuecomment-2052497270 ## CI report: * f6f303c7a2c89f5926d1f8dfda1d39fdd0134cba Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23226) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled
[ https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836738#comment-17836738 ] Jonathan Vexler commented on HUDI-7610: --- retest because default payload changed in the last few days {code:java} @Test def showDeleteIsInconsistent(): Unit = { //val merger = classOf[HoodieSparkRecordMerger].getName val merger = classOf[HoodieAvroRecordMerger].getName val useFGReader = "true" //val useFGReader = "false" val tableType = "COPY_ON_WRITE" //val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "number") val data = Seq((10, "A", 1), (10, "B", 1)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(TABLE_TYPE.key(), tableType). option("hoodie.table.name", "test_table"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Overwrite). save(basePath) val updateData = Seq((9, "A", 2)) val updates = spark.createDataFrame(updateData).toDF(columns: _*) updates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val deleteData = Seq((11, "B", 2)) val deletes = spark.createDataFrame(deleteData).toDF(columns: _*) deletes.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "delete"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val updateData2 = Seq((11, "A", 3),(9, "B", 3)) val updates2 = spark.createDataFrame(updateData2).toDF(columns: _*) updates2.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val df = spark.read.format("hudi"). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), "false").load(basePath) val finalDf = df.select("ts", "key", "number") finalDf.show(100, false) finalDf.show(100, false) } {code} merger: Spark, useFGReader: false, tableType: cow merger: Spark, useFGReader: true, tableType: cow merger: Avro, useFGReader: true, tableType: cow merger: Avro, useFGReader: false, tableType: cow {code:java} +---+---+--+ |ts |key|number| +---+---+--+ |11 |A |3 | |9 |B |3 | +---+---+--+ {code} merger: Avro, useFGReader: false, tableType: mor {code:java} +---+---+--+ |ts |key|number| +---+---+--+ |11 |A |3 | |10 |B |1 | +---+---+--+ {code} merger: Avro, useFGReader: true, tableType: mor {code:java} +---+---+--+ |ts |key|number| +---+---+--+ |11 |A |3 | +---+---+--+ {code} merger: Spark, useFGReader: true, tableType: mor NPE merger: Spark, useFGReader: false, tableType: mor Caused by: org.apache.hudi.exception.HoodieException: Ordering value is null for record: null > Delete records are inconsistent depending on MOR/COW, Avro/Spark record > merger, new filegroup reader enabled/disabled > - > > Key: HUDI-7610 > URL: https://issues.apache.org/jira/browse/HUDI-7610 > Project: Apache Hudi > Issue Type: Bug > Components: reader-core >Reporter: Jonathan Vexler >Priority: Blocker > Fix For: 1.0.0 > > > Here is a test that can be run on master: > > {code:java} > @Test > def showDeleteIsInconsistent(): Unit = { > val merger =
Re: [PR] [MINOR] Hudi CLI 'version' command output empty string [hudi]
pt657407064 commented on code in PR #10973: URL: https://github.com/apache/hudi/pull/10973#discussion_r1563160252 ## hudi-cli/src/main/resources/application.yml: ## @@ -20,4 +20,7 @@ spring: shell: history: enabled: true - name: hoodie-cmd.log \ No newline at end of file + name: hoodie-cmd.log +command: + version: +template: "classpath:version.txt" Review Comment: CI issue has been fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7378] Fix Spark SQL DML with custom key generator [hudi]
yihua commented on code in PR #10615: URL: https://github.com/apache/hudi/pull/10615#discussion_r1563151868 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala: ## @@ -528,6 +536,40 @@ object ProvidesHoodieConfig { filterNullValues(overridingOpts) } + /** + * @param tableConfigKeyGeneratorClassName key generator class name in the table config. + * @param partitionFieldNamesWithoutKeyGenType partition field names without key generator types + * from the table config. + * @param catalogTable HoodieCatalogTable instance to fetch table properties. + * @return the write config value to set for "hoodie.datasource.write.partitionpath.field". + */ + def getPartitionPathFieldWriteConfig(tableConfigKeyGeneratorClassName: String, + partitionFieldNamesWithoutKeyGenType: String, + catalogTable: HoodieCatalogTable): String = { +if (StringUtils.isNullOrEmpty(tableConfigKeyGeneratorClassName)) { + partitionFieldNamesWithoutKeyGenType +} else { Review Comment: Flink writer should provide the correct partition field write config. The query side may have some gaps. Created [HUDI-7613](https://issues.apache.org/jira/browse/HUDI-7613) as a follow-up. ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala: ## @@ -201,8 +201,26 @@ object HoodieWriterUtils { diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n") } +// Please note that the validation of partition path fields needs the key generator class +// for the table, since the custom key generator expects a different format of +// the value of the write config "hoodie.datasource.write.partitionpath.field" +// e.g., "col:simple,ts:timestamp", whereas the table config "hoodie.table.partition.fields" +// in hoodie.properties stores "col,ts". +// The "params" here may only contain the write config of partition path field, +// so we need to pass in the validated key generator class name. +val validatedKeyGenClassName = if (tableConfigKeyGen != null) { Review Comment: Only the `hoodie.datasource.write.partitionpath.field` takes effect in the writer path. Before the fix, the write config is automatically set by the SQL writer based on the value of table config `hoodie.table.partition.fields`. ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala: ## @@ -528,6 +536,40 @@ object ProvidesHoodieConfig { filterNullValues(overridingOpts) } + /** + * @param tableConfigKeyGeneratorClassName key generator class name in the table config. + * @param partitionFieldNamesWithoutKeyGenType partition field names without key generator types + * from the table config. + * @param catalogTable HoodieCatalogTable instance to fetch table properties. + * @return the write config value to set for "hoodie.datasource.write.partitionpath.field". + */ + def getPartitionPathFieldWriteConfig(tableConfigKeyGeneratorClassName: String, + partitionFieldNamesWithoutKeyGenType: String, + catalogTable: HoodieCatalogTable): String = { +if (StringUtils.isNullOrEmpty(tableConfigKeyGeneratorClassName)) { + partitionFieldNamesWithoutKeyGenType +} else { + val writeConfigPartitionField = catalogTable.catalogProperties.get(PARTITIONPATH_FIELD.key()) + val keyGenClass = ReflectionUtils.getClass(tableConfigKeyGeneratorClassName) + if (classOf[CustomKeyGenerator].equals(keyGenClass) +|| classOf[CustomAvroKeyGenerator].equals(keyGenClass)) { +// For custom key generator, we have to take the write config value from +// "hoodie.datasource.write.partitionpath.field" which contains the key generator +// type, whereas the table config only contains the prtition field names without +// key generator types. +if (writeConfigPartitionField.isDefined) { + writeConfigPartitionField.get +} else { + log.warn("Write config \"hoodie.datasource.write.partitionpath.field\" is not set for " ++ "custom key generator. This may fail the write operation.") + partitionFieldNamesWithoutKeyGenType Review Comment: It fails with the error message `Unable to find field names for partition path in proper format` in the `CustomKeyGenerator` indicating that the config is not set properly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use
[jira] [Updated] (HUDI-7613) Check write/query with Flink and Hive on CustomKeyGenerator
[ https://issues.apache.org/jira/browse/HUDI-7613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ethan Guo updated HUDI-7613: Description: https://github.com/apache/hudi/pull/10615/files#r1551075779 > Check write/query with Flink and Hive on CustomKeyGenerator > --- > > Key: HUDI-7613 > URL: https://issues.apache.org/jira/browse/HUDI-7613 > Project: Apache Hudi > Issue Type: Improvement >Reporter: Ethan Guo >Priority: Major > > https://github.com/apache/hudi/pull/10615/files#r1551075779 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7613) Check write/query with Flink and Hive on CustomKeyGenerator
Ethan Guo created HUDI-7613: --- Summary: Check write/query with Flink and Hive on CustomKeyGenerator Key: HUDI-7613 URL: https://issues.apache.org/jira/browse/HUDI-7613 Project: Apache Hudi Issue Type: Improvement Reporter: Ethan Guo -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-7613) Check write/query with Flink and Hive on CustomKeyGenerator
[ https://issues.apache.org/jira/browse/HUDI-7613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ethan Guo updated HUDI-7613: Fix Version/s: 1.0.0 > Check write/query with Flink and Hive on CustomKeyGenerator > --- > > Key: HUDI-7613 > URL: https://issues.apache.org/jira/browse/HUDI-7613 > Project: Apache Hudi > Issue Type: Improvement >Reporter: Ethan Guo >Priority: Major > Fix For: 1.0.0 > > > https://github.com/apache/hudi/pull/10615/files#r1551075779 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled
[ https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836737#comment-17836737 ] Jonathan Vexler commented on HUDI-7610: --- use hoodie is deleted where delete precombine is less than insert {code:java} @Test def showDeleteIsInconsistent(): Unit = { val merger = classOf[HoodieSparkRecordMerger].getName //val merger = classOf[HoodieAvroRecordMerger].getName val useFGReader = "true" //val useFGReader = "false" //val tableType = "COPY_ON_WRITE" val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "number", "_hoodie_is_deleted") val data = Seq((10, "A", 1, false), (10, "B", 1, false)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(TABLE_TYPE.key(), tableType). option("hoodie.table.name", "test_table"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Overwrite). save(basePath) val updateData = Seq((12, "A", 2, false),(9, "B", 2, true)) val updates = spark.createDataFrame(updateData).toDF(columns: _*) updates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val updateData2 = Seq((11, "A", 3, false),(8, "B", 3, false)) val updates2 = spark.createDataFrame(updateData2).toDF(columns: _*) updates2.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val df = spark.read.format("hudi"). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), "false").load(basePath) val finalDf = df.select("ts", "key", "number", "_hoodie_is_deleted") finalDf.show(100, false) finalDf.show(100, false) } {code} merger: Avro, useFGReader: false, tableType: cow merger: Avro, useFGReader: true, tableType: cow merger: Spark, useFGReader: true, tableType: cow merger: Spark, useFGReader: false, tableType: cow merger: Avro, useFGReader: false, tableType: mor merger: Spark, useFGReader: false, tableType: mor {code:java} +---+---+--+--+ |ts |key|number|_hoodie_is_deleted| +---+---+--+--+ |12 |A |2 |false | |10 |B |1 |false | +---+---+--+--+ {code} merger: Avro, useFGReader: true, tableType: mor merger: Spark, useFGReader: true, tableType: mor {code:java} +---+---+--+--+ |ts |key|number|_hoodie_is_deleted| +---+---+--+--+ |12 |A |2 |false | +---+---+--+--+ {code} > Delete records are inconsistent depending on MOR/COW, Avro/Spark record > merger, new filegroup reader enabled/disabled > - > > Key: HUDI-7610 > URL: https://issues.apache.org/jira/browse/HUDI-7610 > Project: Apache Hudi > Issue Type: Bug > Components: reader-core >Reporter: Jonathan Vexler >Priority: Blocker > Fix For: 1.0.0 > > > Here is a test that can be run on master: > > {code:java} > @Test > def showDeleteIsInconsistent(): Unit = { > val merger = classOf[HoodieSparkRecordMerger].getName > //val merger = classOf[HoodieAvroRecordMerger].getName > val useFGReader = "true" > //val useFGReader = "false" > //val tableType = "COPY_ON_WRITE" > val tableType = "MERGE_ON_READ" > val columns = Seq("ts", "key", "rider", "driver", "fare", "number") > val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7), > (10, "2", "rider-B", "driver-B", 27.70, 1), > (10, "3", "rider-C", "driver-C", 33.90, 10), > (-1, "4", "rider-D", "driver-D", 34.15, 6), > (10, "5", "rider-E", "driver-E", 17.85,
[jira] [Comment Edited] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled
[ https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836735#comment-17836735 ] Jonathan Vexler edited comment on HUDI-7610 at 4/12/24 7:58 PM: use hoodie is deleted: {code:java} @Test def showDeleteIsInconsistent(): Unit = { //val merger = classOf[HoodieSparkRecordMerger].getName val merger = classOf[HoodieAvroRecordMerger].getName //val useFGReader = "true" val useFGReader = "false" val tableType = "COPY_ON_WRITE" //val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "number", "_hoodie_is_deleted") val data = Seq((10, "A", 1, false), (10, "B", 1, false)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(TABLE_TYPE.key(), tableType). option("hoodie.table.name", "test_table"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Overwrite). save(basePath) val updateData = Seq((9, "A", 2, false),(11, "B", 2, true)) val updates = spark.createDataFrame(updateData).toDF(columns: _*) updates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val updateData2 = Seq((11, "A", 3, false),(9, "B", 3, false)) val updates2 = spark.createDataFrame(updateData2).toDF(columns: _*) updates2.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val df = spark.read.format("hudi"). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), "false").load(basePath) val finalDf = df.select("ts", "key", "number", "_hoodie_is_deleted") finalDf.show(100, false) finalDf.show(100, false) } {code} merger: Avro, useFGReader: false, tableType: cow merger: Avro, useFGReader: true, tableType: cow merger: Spark, useFGReader: true, tableType: cow merger: Spark, useFGReader: false, tableType: cow {code:java} +---+---+--+--+ |ts |key|number|_hoodie_is_deleted| +---+---+--+--+ |11 |A |3 |false | |9 |B |3 |false | +---+---+--+--+ {code} merger: Avro, useFGReader: false, tableType: mor merger: Spark, useFGReader: false, tableType: mor {code:java} +---+---+--+--+ |ts |key|number|_hoodie_is_deleted| +---+---+--+--+ |11 |A |3 |false | |10 |B |1 |false | +---+---+--+--+ {code} merger: Avro, useFGReader: true, tableType: mor merger: Spark, useFGReader: true, tableType: mor {code:java} +---+---+--+--+ |ts |key|number|_hoodie_is_deleted| +---+---+--+--+ |11 |A |3 |false | +---+---+--+--+ {code} was (Author: JIRAUSER295101): use hoodie is deleted: {code:java} @Test def showDeleteIsInconsistent(): Unit = { //val merger = classOf[HoodieSparkRecordMerger].getName val merger = classOf[HoodieAvroRecordMerger].getName //val useFGReader = "true" val useFGReader = "false" val tableType = "COPY_ON_WRITE" //val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "number", "_hoodie_is_deleted") val data = Seq((10, "A", 1, false), (10, "B", 1, false)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(TABLE_TYPE.key(), tableType). option("hoodie.table.name", "test_table"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Overwrite).
[jira] [Commented] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled
[ https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836735#comment-17836735 ] Jonathan Vexler commented on HUDI-7610: --- use hoodie is deleted: {code:java} @Test def showDeleteIsInconsistent(): Unit = { //val merger = classOf[HoodieSparkRecordMerger].getName val merger = classOf[HoodieAvroRecordMerger].getName //val useFGReader = "true" val useFGReader = "false" val tableType = "COPY_ON_WRITE" //val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "number", "_hoodie_is_deleted") val data = Seq((10, "A", 1, false), (10, "B", 1, false)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(TABLE_TYPE.key(), tableType). option("hoodie.table.name", "test_table"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Overwrite). save(basePath) val updateData = Seq((9, "A", 2, false),(11, "B", 2, true)) val updates = spark.createDataFrame(updateData).toDF(columns: _*) updates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val updateData2 = Seq((11, "A", 3, false),(9, "B", 3, false)) val updates2 = spark.createDataFrame(updateData2).toDF(columns: _*) updates2.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). mode(SaveMode.Append). save(basePath) val df = spark.read.format("hudi"). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), "false").load(basePath) val finalDf = df.select("ts", "key", "number", "_hoodie_is_deleted") finalDf.show(100, false) finalDf.show(100, false) } {code} merger: Avro, useFGReader: false, tableType: cow merger: Avro, useFGReader: true, tableType: cow merger: Spark, useFGReader: true, tableType: cow merger: Spark, useFGReader: false, tableType: cow {code:java} +---+---+--+--+ |ts |key|number|_hoodie_is_deleted| +---+---+--+--+ |11 |A |3 |false | |9 |B |3 |false | +---+---+--+--+ {code} merger: Avro, useFGReader: false, tableType: mor merger: Spark, useFGReader: false, tableType: mor {code:java} +---+---+--+--+ |ts |key|number|_hoodie_is_deleted| +---+---+--+--+ |11 |A |3 |false | |10 |B |1 |false | +---+---+--+--+ {code} merger: Avro, useFGReader: true, tableType: mor merger: Spark, useFGReader: true, tableType: mor {code:java} +---+---+--+--+ |ts |key|number|_hoodie_is_deleted| +---+---+--+--+ |11 |A |3 |false | +---+---+--+--+ {code} > Delete records are inconsistent depending on MOR/COW, Avro/Spark record > merger, new filegroup reader enabled/disabled > - > > Key: HUDI-7610 > URL: https://issues.apache.org/jira/browse/HUDI-7610 > Project: Apache Hudi > Issue Type: Bug > Components: reader-core >Reporter: Jonathan Vexler >Priority: Blocker > Fix For: 1.0.0 > > > Here is a test that can be run on master: > > {code:java} > @Test > def showDeleteIsInconsistent(): Unit = { > val merger = classOf[HoodieSparkRecordMerger].getName > //val merger = classOf[HoodieAvroRecordMerger].getName > val useFGReader = "true" > //val useFGReader = "false" > //val tableType = "COPY_ON_WRITE" > val tableType = "MERGE_ON_READ" > val columns = Seq("ts", "key", "rider", "driver", "fare", "number") > val data = Seq((10, "1", "rider-A", "driver-A",
Re: [PR] [HUDI-7269] Fallback to key based merge if positions are missing from log block [hudi]
hudi-bot commented on PR #10991: URL: https://github.com/apache/hudi/pull/10991#issuecomment-2052437226 ## CI report: * 7dfe5ef7fa89cebfca107cd54ca9f417eff2ba3c Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23194) * 2af03c004aef66248dae6283e9c2f1e63e062e75 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23229) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [DO NOT MERGE][HUDI-7567] Add schema evolution to the filegroup reader [hudi]
hudi-bot commented on PR #10957: URL: https://github.com/apache/hudi/pull/10957#issuecomment-2052437106 ## CI report: * ee7a0e3a401dd0d2c0f2d1095256fb9f27c9802f Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23222) * 966e8c85f2afb0ffaf00e12d02eb41b41c68e0bc Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23228) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]
hudi-bot commented on PR #10956: URL: https://github.com/apache/hudi/pull/10956#issuecomment-2052437070 ## CI report: * 37bc97b3e080cb3664405a446c0174655720d41c Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23221) * a73f9559fc8626342b767085cf7a56f743a425fc Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23227) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Setup spark and timeline services once where possible [hudi]
hudi-bot commented on PR #11009: URL: https://github.com/apache/hudi/pull/11009#issuecomment-2052428638 ## CI report: * 819ec8e0c3de67165bd6d54b35d5c708e28d98a0 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23223) * f6f303c7a2c89f5926d1f8dfda1d39fdd0134cba Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23226) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7269] Fallback to key based merge if positions are missing from log block [hudi]
hudi-bot commented on PR #10991: URL: https://github.com/apache/hudi/pull/10991#issuecomment-2052428509 ## CI report: * 7dfe5ef7fa89cebfca107cd54ca9f417eff2ba3c Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23194) * 2af03c004aef66248dae6283e9c2f1e63e062e75 UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [DO NOT MERGE][HUDI-7567] Add schema evolution to the filegroup reader [hudi]
hudi-bot commented on PR #10957: URL: https://github.com/apache/hudi/pull/10957#issuecomment-2052428384 ## CI report: * ee7a0e3a401dd0d2c0f2d1095256fb9f27c9802f Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23222) * 966e8c85f2afb0ffaf00e12d02eb41b41c68e0bc UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]
hudi-bot commented on PR #10956: URL: https://github.com/apache/hudi/pull/10956#issuecomment-2052428308 ## CI report: * 37bc97b3e080cb3664405a446c0174655720d41c Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23221) * a73f9559fc8626342b767085cf7a56f743a425fc UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Setup spark and timeline services once where possible [hudi]
hudi-bot commented on PR #11009: URL: https://github.com/apache/hudi/pull/11009#issuecomment-2052420254 ## CI report: * 819ec8e0c3de67165bd6d54b35d5c708e28d98a0 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23223) * f6f303c7a2c89f5926d1f8dfda1d39fdd0134cba UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [MINOR] Make ordering deterministic in small file selection [hudi]
hudi-bot commented on PR #11008: URL: https://github.com/apache/hudi/pull/11008#issuecomment-2052420166 ## CI report: * baaff5d03b4199e0aa188492cfa8a5fe2908a47e Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23220) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (HUDI-7612) HoodieSparkRecordMerger does not handle deletes based on the preCombine/ordering field
Jonathan Vexler created HUDI-7612: - Summary: HoodieSparkRecordMerger does not handle deletes based on the preCombine/ordering field Key: HUDI-7612 URL: https://issues.apache.org/jira/browse/HUDI-7612 Project: Apache Hudi Issue Type: Bug Components: spark Reporter: Jonathan Vexler Assignee: Jonathan Vexler The merger handles deletes based off of overwrite with latest. But the rest of the logic is like default payload -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (HUDI-7611) DELETE operation does not route preCombine/ordering field values to the delete records
Jonathan Vexler created HUDI-7611: - Summary: DELETE operation does not route preCombine/ordering field values to the delete records Key: HUDI-7611 URL: https://issues.apache.org/jira/browse/HUDI-7611 Project: Apache Hudi Issue Type: Bug Components: spark Reporter: Jonathan Vexler Assignee: Jonathan Vexler Write client just takes in a list of keys for the delete operation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (HUDI-7611) DELETE operation does not route preCombine/ordering field values to the delete records
[ https://issues.apache.org/jira/browse/HUDI-7611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jonathan Vexler updated HUDI-7611: -- Fix Version/s: 1.0.0 > DELETE operation does not route preCombine/ordering field values to the > delete records > -- > > Key: HUDI-7611 > URL: https://issues.apache.org/jira/browse/HUDI-7611 > Project: Apache Hudi > Issue Type: Bug > Components: spark >Reporter: Jonathan Vexler >Assignee: Jonathan Vexler >Priority: Major > Fix For: 1.0.0 > > > Write client just takes in a list of keys for the delete operation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (HUDI-7565) Break-up schema evolution: port spark code to file readers
[ https://issues.apache.org/jira/browse/HUDI-7565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jonathan Vexler closed HUDI-7565. - Resolution: Fixed > Break-up schema evolution: port spark code to file readers > -- > > Key: HUDI-7565 > URL: https://issues.apache.org/jira/browse/HUDI-7565 > Project: Apache Hudi > Issue Type: Sub-task >Reporter: Jonathan Vexler >Assignee: Jonathan Vexler >Priority: Major > Labels: pull-request-available > Fix For: 1.0.0 > > > [https://github.com/apache/hudi/pull/10278] is too large to review and needs > to be broken into smaller prs. Create PR for just the ported spark code. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled
[ https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836729#comment-17836729 ] Ethan Guo edited comment on HUDI-7610 at 4/12/24 7:03 PM: -- Based on offline discussion, immediately we see two issues: (1) DELETE operation does not route preCombine/ordering field values to the delete records (2) HoodieSparkRecordMerger does not handle deletes based on the preCombine/ordering field (it mixes the logic of overwrite with the latest, i.e., commit time-based, and default hudi payload, i.e., order time-based) Let's create separate JIRAs for the above issues. A few more things to try to see if there are other issues: (1) use UPSERT operation with _hoodie_is_deleted field for deletes (2) for Avro merger, default payload class is the OverwriteWithLatestAvroPayload; change the payload to DefaultHoodieRecordPayload and check results was (Author: guoyihua): Based on offline discussion, immediately we see two issues: (1) DELETE operation does not route preCombine/ordering field values to the delete records (2) HoodieSparkRecordMerger does not handle deletes based on the preCombine/ordering field (it mixes the logic of overwrite with the latest, i.e., commit time-based, and default hudi payload, i.e., order time-based) A few more things to try to see if there are other issues: (1) use UPSERT operation with _hoodie_is_deleted field for deletes (2) for Avro merger, default payload class is the OverwriteWithLatestAvroPayload; change the payload to DefaultHoodieRecordPayload and check results > Delete records are inconsistent depending on MOR/COW, Avro/Spark record > merger, new filegroup reader enabled/disabled > - > > Key: HUDI-7610 > URL: https://issues.apache.org/jira/browse/HUDI-7610 > Project: Apache Hudi > Issue Type: Bug > Components: reader-core >Reporter: Jonathan Vexler >Priority: Blocker > Fix For: 1.0.0 > > > Here is a test that can be run on master: > > {code:java} > @Test > def showDeleteIsInconsistent(): Unit = { > val merger = classOf[HoodieSparkRecordMerger].getName > //val merger = classOf[HoodieAvroRecordMerger].getName > val useFGReader = "true" > //val useFGReader = "false" > //val tableType = "COPY_ON_WRITE" > val tableType = "MERGE_ON_READ" > val columns = Seq("ts", "key", "rider", "driver", "fare", "number") > val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7), > (10, "2", "rider-B", "driver-B", 27.70, 1), > (10, "3", "rider-C", "driver-C", 33.90, 10), > (-1, "4", "rider-D", "driver-D", 34.15, 6), > (10, "5", "rider-E", "driver-E", 17.85, 10)) > val inserts = spark.createDataFrame(data).toDF(columns: _*) > inserts.write.format("hudi"). > option(RECORDKEY_FIELD.key(), "key"). > option(PRECOMBINE_FIELD.key(), "ts"). > option(TABLE_TYPE.key(), tableType). > option("hoodie.table.name", "test_table"). > option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). > option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). > option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). > mode(SaveMode.Overwrite). > save(basePath) > val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9), > (9, "2", "rider-Y", "driver-Y", 27.70, 7)) > val updates = spark.createDataFrame(updateData).toDF(columns: _*) > updates.write.format("hudi"). > option(RECORDKEY_FIELD.key(), "key"). > option(PRECOMBINE_FIELD.key(), "ts"). > option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). > option(TABLE_TYPE.key(), tableType). > option(OPERATION.key(), "upsert"). > option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). > option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). > option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). > mode(SaveMode.Append). > save(basePath) > val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6)) > val deletes = spark.createDataFrame(deletesData).toDF(columns: _*) > deletes.write.format("hudi"). > option(RECORDKEY_FIELD.key(), "key"). > option(PRECOMBINE_FIELD.key(), "ts"). > option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). > option(TABLE_TYPE.key(), tableType). > option(OPERATION.key(), "delete"). > option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). > option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). > option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). > mode(SaveMode.Append). > save(basePath) > val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), > (-10, "4", "rider-DD", "driver-DD", 34.15, 5)) > val
[jira] [Commented] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled
[ https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836729#comment-17836729 ] Ethan Guo commented on HUDI-7610: - Based on offline discussion, immediately we see two issues: (1) DELETE operation does not route preCombine/ordering field values to the delete records (2) HoodieSparkRecordMerger does not handle deletes based on the preCombine/ordering field (it mixes the logic of overwrite with the latest, i.e., commit time-based, and default hudi payload, i.e., order time-based) A few more things to try to see if there are other issues: (1) use UPSERT operation with _hoodie_is_deleted field for deletes (2) for Avro merger, default payload class is the OverwriteWithLatestAvroPayload; change the payload to DefaultHoodieRecordPayload and check results > Delete records are inconsistent depending on MOR/COW, Avro/Spark record > merger, new filegroup reader enabled/disabled > - > > Key: HUDI-7610 > URL: https://issues.apache.org/jira/browse/HUDI-7610 > Project: Apache Hudi > Issue Type: Bug > Components: reader-core >Reporter: Jonathan Vexler >Priority: Blocker > Fix For: 1.0.0 > > > Here is a test that can be run on master: > > {code:java} > @Test > def showDeleteIsInconsistent(): Unit = { > val merger = classOf[HoodieSparkRecordMerger].getName > //val merger = classOf[HoodieAvroRecordMerger].getName > val useFGReader = "true" > //val useFGReader = "false" > //val tableType = "COPY_ON_WRITE" > val tableType = "MERGE_ON_READ" > val columns = Seq("ts", "key", "rider", "driver", "fare", "number") > val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7), > (10, "2", "rider-B", "driver-B", 27.70, 1), > (10, "3", "rider-C", "driver-C", 33.90, 10), > (-1, "4", "rider-D", "driver-D", 34.15, 6), > (10, "5", "rider-E", "driver-E", 17.85, 10)) > val inserts = spark.createDataFrame(data).toDF(columns: _*) > inserts.write.format("hudi"). > option(RECORDKEY_FIELD.key(), "key"). > option(PRECOMBINE_FIELD.key(), "ts"). > option(TABLE_TYPE.key(), tableType). > option("hoodie.table.name", "test_table"). > option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). > option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). > option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). > mode(SaveMode.Overwrite). > save(basePath) > val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9), > (9, "2", "rider-Y", "driver-Y", 27.70, 7)) > val updates = spark.createDataFrame(updateData).toDF(columns: _*) > updates.write.format("hudi"). > option(RECORDKEY_FIELD.key(), "key"). > option(PRECOMBINE_FIELD.key(), "ts"). > option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). > option(TABLE_TYPE.key(), tableType). > option(OPERATION.key(), "upsert"). > option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). > option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). > option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). > mode(SaveMode.Append). > save(basePath) > val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6)) > val deletes = spark.createDataFrame(deletesData).toDF(columns: _*) > deletes.write.format("hudi"). > option(RECORDKEY_FIELD.key(), "key"). > option(PRECOMBINE_FIELD.key(), "ts"). > option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). > option(TABLE_TYPE.key(), tableType). > option(OPERATION.key(), "delete"). > option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). > option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). > option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). > mode(SaveMode.Append). > save(basePath) > val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), > (-10, "4", "rider-DD", "driver-DD", 34.15, 5)) > val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: > _*) > secondUpdates.write.format("hudi"). > option(RECORDKEY_FIELD.key(), "key"). > option(PRECOMBINE_FIELD.key(), "ts"). > option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). > option(TABLE_TYPE.key(), tableType). > option(OPERATION.key(), "upsert"). > option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). > option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). > option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). > mode(SaveMode.Append). > save(basePath) > val df = spark.read.format("hudi"). > option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). >
Re: [PR] [MINOR] Make ordering deterministic in small file selection [hudi]
hudi-bot commented on PR #11008: URL: https://github.com/apache/hudi/pull/11008#issuecomment-2052326296 ## CI report: * baaff5d03b4199e0aa188492cfa8a5fe2908a47e Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23220) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [MINOR] Make ordering deterministic in small file selection [hudi]
the-other-tim-brown commented on PR #11008: URL: https://github.com/apache/hudi/pull/11008#issuecomment-2052311371 @hudi-bot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7378] Fix Spark SQL DML with custom key generator [hudi]
yihua commented on PR #10615: URL: https://github.com/apache/hudi/pull/10615#issuecomment-2052277245 > I like that this has the benefit of not breaking tables with their existing hoodie.table.recordkey.fields, but I am curious about any other approaches you thought about. From you test code, it looks like we can't use `partitioned by (dt:int,idk:string)` when creating the table. I don't think that should block this pr from landing, but in the documentation for SQL: https://hudi.apache.org/docs/sql_ddl#create-partitioned-table I think we should add an example Good point. I tried `partitioned by` statement but it did not work either, due to the same the write config of the partition fields. But you're right that adding a new table config indicating the partition field types should solve the problem fundamentally. We should update the SQL docs on any gaps here. > > Also, I think think this change will help us to fix partition pruning which currently does not work with timestamp keygen: https://issues.apache.org/jira/browse/HUDI-6614 Right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT] Questions about LOG in Hudi source code [hudi]
Gatsby-Lee commented on issue #10903: URL: https://github.com/apache/hudi/issues/10903#issuecomment-2052276305 @danny0405 Thank you for your response. Can you share what to be added into the log4j2 config to print the Hudi log into AWS EMR log? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7599] add bootstrap mor legacy reader back to default source [hudi]
yihua merged PR #10990: URL: https://github.com/apache/hudi/pull/10990 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(hudi) branch master updated: [HUDI-7599] add bootstrap mor legacy reader back to default source (#10990)
This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git The following commit(s) were added to refs/heads/master by this push: new 56aded81287 [HUDI-7599] add bootstrap mor legacy reader back to default source (#10990) 56aded81287 is described below commit 56aded81287f295cfee692f16be0adc6f175902e Author: Jon Vexler AuthorDate: Fri Apr 12 14:31:10 2024 -0400 [HUDI-7599] add bootstrap mor legacy reader back to default source (#10990) Co-authored-by: Jonathan Vexler <=> --- .../src/main/scala/org/apache/hudi/DefaultSource.scala | 12 ++-- .../hudi/functional/TestNewHoodieParquetFileFormat.java | 12 +++- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index be3d2f4ed4b..8efa8e28867 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -299,14 +299,22 @@ object DefaultSource { new IncrementalRelation(sqlContext, parameters, userSchema, metaClient) } - case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, _) => + case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) => if (useNewParquetFileFormat) { new HoodieMergeOnReadSnapshotHadoopFsRelationFactory( -sqlContext, metaClient, parameters, userSchema, isBootstrappedTable).build() +sqlContext, metaClient, parameters, userSchema, isBootstrap = false).build() } else { new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, globPaths, userSchema) } + case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) => +if (useNewParquetFileFormat) { + new HoodieMergeOnReadSnapshotHadoopFsRelationFactory( +sqlContext, metaClient, parameters, userSchema, isBootstrap = true).build() +} else { + HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths, metaClient, parameters) +} + case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => if (useNewParquetFileFormat) { new HoodieMergeOnReadIncrementalHadoopFsRelationFactory( diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java index ce462c93d1b..be2b6ff949e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java @@ -24,7 +24,6 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Tag; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -39,18 +38,13 @@ import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ; import static org.junit.jupiter.api.Assertions.assertEquals; @Tag("functional") -@Disabled("HUDI-6756") public class TestNewHoodieParquetFileFormat extends TestBootstrapReadBase { private static Stream testArgs() { Stream.Builder b = Stream.builder(); -HoodieTableType[] tableType = {COPY_ON_WRITE, MERGE_ON_READ}; -Integer[] nPartitions = {0, 1, 2}; -for (HoodieTableType tt : tableType) { - for (Integer n : nPartitions) { -b.add(Arguments.of(tt, n)); - } -} +b.add(Arguments.of(MERGE_ON_READ, 0)); +b.add(Arguments.of(COPY_ON_WRITE, 1)); +b.add(Arguments.of(MERGE_ON_READ, 2)); return b.build(); }
Re: [PR] [DO NOT MERGE][HUDI-7567] Add schema evolution to the filegroup reader [hudi]
hudi-bot commented on PR #10957: URL: https://github.com/apache/hudi/pull/10957#issuecomment-2052198778 ## CI report: * ee7a0e3a401dd0d2c0f2d1095256fb9f27c9802f Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23222) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7604] Make table name config work properly [hudi]
jonvex commented on code in PR #10998: URL: https://github.com/apache/hudi/pull/10998#discussion_r1562940754 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala: ## @@ -964,6 +964,11 @@ object DataSourceOptionsHelper { def translateConfigurations(optParams: Map[String, String]): Map[String, String] = { val translatedOpt = scala.collection.mutable.Map[String, String]() ++= optParams +if (!translatedOpt.contains(HoodieTableConfig.NAME.key()) && Review Comment: I don't think this makes things worse than they already are with spark configs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [DO NOT MERGE][HUDI-7566] Add schema evolution to spark file readers [hudi]
hudi-bot commented on PR #10956: URL: https://github.com/apache/hudi/pull/10956#issuecomment-2052198701 ## CI report: * 37bc97b3e080cb3664405a446c0174655720d41c Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23221) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled
[ https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jonathan Vexler updated HUDI-7610: -- Description: Here is a test that can be run on master: {code:java} @Test def showDeleteIsInconsistent(): Unit = { val merger = classOf[HoodieSparkRecordMerger].getName //val merger = classOf[HoodieAvroRecordMerger].getName val useFGReader = "true" //val useFGReader = "false" //val tableType = "COPY_ON_WRITE" val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "rider", "driver", "fare", "number") val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7), (10, "2", "rider-B", "driver-B", 27.70, 1), (10, "3", "rider-C", "driver-C", 33.90, 10), (-1, "4", "rider-D", "driver-D", 34.15, 6), (10, "5", "rider-E", "driver-E", 17.85, 10)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(TABLE_TYPE.key(), tableType). option("hoodie.table.name", "test_table"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Overwrite). save(basePath) val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9), (9, "2", "rider-Y", "driver-Y", 27.70, 7)) val updates = spark.createDataFrame(updateData).toDF(columns: _*) updates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6)) val deletes = spark.createDataFrame(deletesData).toDF(columns: _*) deletes.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "delete"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), (-10, "4", "rider-DD", "driver-DD", 34.15, 5)) val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: _*) secondUpdates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val df = spark.read.format("hudi"). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), "false").load(basePath) val finalDf = df.select("ts", "key", "rider", "driver", "fare", "number") finalDf.show(100,false) }{code} There are 4 different outcomes: merger: Avro, useFGReader: false, tableType: cow merger: Avro, useFGReader: true, tableType: cow merger: Spark, useFGReader: false, tableType: cow merger: Spark, useFGReader: true, tableType: cow {code:java} +---+---++-+-+--+ |ts |key|rider |driver |fare |number| +---+---++-+-+--+ |11 |1 |rider-X |driver-X |19.1 |9 | |14 |5 |rider-Z |driver-Z |17.85|3 | |10 |3 |rider-C |driver-C |33.9 |10| |10 |2 |rider-B |driver-B |27.7 |1 | |-10|4 |rider-DD|driver-DD|34.15|5 | +---+---++-+-+--+{code} merger: Avro, useFGReader: false, tableType: mor {code:java} +---+---+---++-+--+ |ts |key|rider |driver |fare |number| +---+---+---++-+--+ |11 |1 |rider-X|driver-X|19.1 |9 | |14 |5 |rider-Z|driver-Z|17.85|3 | |-1 |4 |rider-D|driver-D|34.15|6 | |10 |3 |rider-C|driver-C|33.9 |10| |10 |2 |rider-B|driver-B|27.7 |1 | +---+---+---++-+--+ {code} merger: Avro, useFGReader: true, tableType: mor {code:java}
Re: [PR] [HUDI-7604] Make table name config work properly [hudi]
yihua commented on code in PR #10998: URL: https://github.com/apache/hudi/pull/10998#discussion_r1562929292 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala: ## @@ -964,6 +964,11 @@ object DataSourceOptionsHelper { def translateConfigurations(optParams: Map[String, String]): Map[String, String] = { val translatedOpt = scala.collection.mutable.Map[String, String]() ++= optParams +if (!translatedOpt.contains(HoodieTableConfig.NAME.key()) && Review Comment: Understood. Should we deprioritize this for now and look for a better way? I think it's better to have such logic in a common place instead of being scattered in a separate method, which is going to be harder to maintain. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(hudi) branch master updated: [HUDI-7565] Create spark file readers to read a single file instead of an entire partition (#10954)
This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git The following commit(s) were added to refs/heads/master by this push: new f715e8a02e8 [HUDI-7565] Create spark file readers to read a single file instead of an entire partition (#10954) f715e8a02e8 is described below commit f715e8a02e8ee5561274ad38bdda5e863317b240 Author: Jon Vexler AuthorDate: Fri Apr 12 13:29:12 2024 -0400 [HUDI-7565] Create spark file readers to read a single file instead of an entire partition (#10954) Co-authored-by: Jonathan Vexler <=> --- .../datasources/parquet/SparkParquetReader.scala | 44 .../org/apache/spark/sql/hudi/SparkAdapter.scala | 18 +- .../parquet/SparkParquetReaderBase.scala | 96 +++ .../parquet/TestSparkParquetReaderFormat.scala | 56 .../hudi/functional/TestSparkParquetReader.java| 48 .../org/apache/hudi/util/JavaConversions.scala | 22 +- .../apache/spark/sql/adapter/Spark2Adapter.scala | 20 +- .../datasources/parquet/Spark24ParquetReader.scala | 225 .../apache/spark/sql/adapter/Spark3_0Adapter.scala | 20 +- .../datasources/parquet/Spark30ParquetReader.scala | 229 + .../apache/spark/sql/adapter/Spark3_1Adapter.scala | 19 +- .../datasources/parquet/Spark31ParquetReader.scala | 242 ++ .../apache/spark/sql/adapter/Spark3_2Adapter.scala | 20 +- .../datasources/parquet/Spark32ParquetReader.scala | 267 +++ .../apache/spark/sql/adapter/Spark3_3Adapter.scala | 20 +- .../datasources/parquet/Spark33ParquetReader.scala | 268 +++ .../apache/spark/sql/adapter/Spark3_4Adapter.scala | 20 +- .../datasources/parquet/Spark34ParquetReader.scala | 277 .../apache/spark/sql/adapter/Spark3_5Adapter.scala | 20 +- .../datasources/parquet/Spark35ParquetReader.scala | 284 + 20 files changed, 2206 insertions(+), 9 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReader.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReader.scala new file mode 100644 index 000..920e4cb0e0b --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReader.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType + +trait SparkParquetReader extends Serializable { + /** + * Read an individual parquet file + * + * @param fileparquet file to read + * @param requiredSchema desired output schema of the data + * @param partitionSchema schema of the partition columns. Partition values will be appended to the end of every row + * @param filters filters for data skipping. Not guaranteed to be used; the spark plan will also apply the filters. + * @param sharedConf the hadoop conf + * @return iterator of rows read from the file output type says [[InternalRow]] but could be [[ColumnarBatch]] + */ + def read(file: PartitionedFile, + requiredSchema: StructType, + partitionSchema: StructType, + filters: Seq[Filter], + sharedConf: Configuration): Iterator[InternalRow] +} diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index 1c6111afe47..91fe6dabc2e 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -19,6 +19,7 @@ package
Re: [PR] [HUDI-7565] Create spark file readers to read a single file instead of an entire partition [hudi]
yihua merged PR #10954: URL: https://github.com/apache/hudi/pull/10954 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7565] Create spark file readers to read a single file instead of an entire partition [hudi]
yihua commented on code in PR #10954: URL: https://github.com/apache/hudi/pull/10954#discussion_r1562924017 ## hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkHoodieParquetReader.java: ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.functional; + +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.util.JavaConversions; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("functional") +public class TestSparkHoodieParquetReader extends TestBootstrapReadBase { Review Comment: @jonvex could you create a follow-up ticket to move the utils in `TestBootstrapReadBase` to a common util class? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled
[ https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jonathan Vexler updated HUDI-7610: -- Description: Here is a test that can be run on master: {code:java} @Test def showDeleteIsInconsistent(): Unit = { val merger = classOf[HoodieSparkRecordMerger].getName //val merger = classOf[HoodieAvroRecordMerger].getName val useFGReader = "true" //val useFGReader = "false" //val tableType = "COPY_ON_WRITE" val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "rider", "driver", "fare", "number") val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7), (10, "2", "rider-B", "driver-B", 27.70, 1), (10, "3", "rider-C", "driver-C", 33.90, 10), (-1, "4", "rider-D", "driver-D", 34.15, 6), (10, "5", "rider-E", "driver-E", 17.85, 10)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(TABLE_TYPE.key(), tableType). option("hoodie.table.name", "test_table"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Overwrite). save(basePath) val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9), (9, "2", "rider-Y", "driver-Y", 27.70, 7)) val updates = spark.createDataFrame(updateData).toDF(columns: _*) updates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6)) val deletes = spark.createDataFrame(deletesData).toDF(columns: _*) deletes.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "delete"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), (-10, "4", "rider-DD", "driver-DD", 34.15, 5)) val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: _*) secondUpdates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val df = spark.read.format("hudi"). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), "false").load(basePath) val finalDf = df.select("ts", "key", "rider", "driver", "fare", "number") finalDf.show(100,false) }{code} There are 4 different outcomes: merger: Avro, useFGReader: false, tableType: cow merger: Avro, useFGReader: true, tableType: cow merger: Spark, useFGReader: false, tableType: cow merger: Spark, useFGReader: true, tableType: cow {code:java} +---+---++-+-+--+ |ts |key|rider |driver |fare |number| +---+---++-+-+--+ |11 |1 |rider-X |driver-X |19.1 |9 | |14 |5 |rider-Z |driver-Z |17.85|3 | |10 |3 |rider-C |driver-C |33.9 |10| |10 |2 |rider-B |driver-B |27.7 |1 | |-10|4 |rider-DD|driver-DD|34.15|5 | +---+---++-+-+--+{code} merger: Avro, useFGReader: false, tableType: mor {code:java} +---+---+---++-+--+ |ts |key|rider |driver |fare |number| +---+---+---++-+--+ |11 |1 |rider-X|driver-X|19.1 |9 | |14 |5 |rider-Z|driver-Z|17.85|3 | |-1 |4 |rider-D|driver-D|34.15|6 | |10 |3 |rider-C|driver-C|33.9 |10| |10 |2 |rider-B|driver-B|27.7 |1 | +---+---+---++-+--+ {code} merger: Avro, useFGReader: true, tableType: mor {code:java}
Re: [PR] Setup spark and timeline services once where possible [hudi]
hudi-bot commented on PR #11009: URL: https://github.com/apache/hudi/pull/11009#issuecomment-2052115383 ## CI report: * 819ec8e0c3de67165bd6d54b35d5c708e28d98a0 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23223) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Setup spark and timeline services once where possible [hudi]
hudi-bot commented on PR #11009: URL: https://github.com/apache/hudi/pull/11009#issuecomment-2052104271 ## CI report: * 819ec8e0c3de67165bd6d54b35d5c708e28d98a0 UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [MINOR] Make ordering deterministic in small file selection [hudi]
hudi-bot commented on PR #11008: URL: https://github.com/apache/hudi/pull/11008#issuecomment-2052104198 ## CI report: * baaff5d03b4199e0aa188492cfa8a5fe2908a47e Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23220) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled
[ https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jonathan Vexler updated HUDI-7610: -- Description: Here is a test that can be run on master: {code:java} @Test def showDeleteIsInconsistent(): Unit = { val merger = classOf[HoodieSparkRecordMerger].getName //val merger = classOf[HoodieAvroRecordMerger].getName val useFGReader = "true" //val useFGReader = "false" //val tableType = "COPY_ON_WRITE" val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "rider", "driver", "fare", "number") val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7), (10, "2", "rider-B", "driver-B", 27.70, 1), (10, "3", "rider-C", "driver-C", 33.90, 10), (-1, "4", "rider-D", "driver-D", 34.15, 6), (10, "5", "rider-E", "driver-E", 17.85, 10)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(TABLE_TYPE.key(), tableType). option("hoodie.table.name", "test_table"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Overwrite). save(basePath) val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9), (9, "2", "rider-Y", "driver-Y", 27.70, 7)) val updates = spark.createDataFrame(updateData).toDF(columns: _*) updates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6)) val deletes = spark.createDataFrame(deletesData).toDF(columns: _*) deletes.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "delete"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), (-10, "4", "rider-DD", "driver-DD", 34.15, 5)) val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: _*) secondUpdates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val df = spark.read.format("hudi"). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), "false").load(basePath) val finalDf = df.select("ts", "key", "rider", "driver", "fare", "number") finalDf.show(100,false) }{code} There are 4 different outcomes: merger: Avro, useFGReader: false, tableType: cow merger: Avro, useFGReader: true, tableType: cow merger: Spark, useFGReader: false, tableType: cow merger: Spark, useFGReader: true, tableType: cow {code:java} +---+---++-+-+--+ |ts |key|rider |driver |fare |number| +---+---++-+-+--+ |11 |1 |rider-X |driver-X |19.1 |9 | |14 |5 |rider-Z |driver-Z |17.85|3 | |10 |3 |rider-C |driver-C |33.9 |10| |10 |2 |rider-B |driver-B |27.7 |1 | |-10|4 |rider-DD|driver-DD|34.15|5 | +---+---++-+-+--+{code} merger: Avro, useFGReader: false, tableType: mor {code:java} +---+---+---++-+--+ |ts |key|rider |driver |fare |number| +---+---+---++-+--+ |11 |1 |rider-X|driver-X|19.1 |9 | |14 |5 |rider-Z|driver-Z|17.85|3 | |-1 |4 |rider-D|driver-D|34.15|6 | |10 |3 |rider-C|driver-C|33.9 |10| |10 |2 |rider-B|driver-B|27.7 |1 | +---+---+---++-+--+ {code} merger: Avro, useFGReader: true, tableType: mor {code:java}
[jira] [Updated] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled
[ https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jonathan Vexler updated HUDI-7610: -- Description: Here is a test that can be run on master: {code:java} @Test def showDeleteIsInconsistent(): Unit = { val merger = classOf[HoodieSparkRecordMerger].getName //val merger = classOf[HoodieAvroRecordMerger].getName val useFGReader = "true" //val useFGReader = "false" //val tableType = "COPY_ON_WRITE" val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "rider", "driver", "fare", "number") val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7), (10, "2", "rider-B", "driver-B", 27.70, 1), (10, "3", "rider-C", "driver-C", 33.90, 10), (-1, "4", "rider-D", "driver-D", 34.15, 6), (10, "5", "rider-E", "driver-E", 17.85, 10)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(TABLE_TYPE.key(), tableType). option("hoodie.table.name", "test_table"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Overwrite). save(basePath) val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9), (9, "2", "rider-Y", "driver-Y", 27.70, 7)) val updates = spark.createDataFrame(updateData).toDF(columns: _*) updates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6)) val deletes = spark.createDataFrame(deletesData).toDF(columns: _*) deletes.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "delete"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), (-10, "4", "rider-DD", "driver-DD", 34.15, 5)) val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: _*) secondUpdates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val df = spark.read.format("hudi"). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), "false").load(basePath) val finalDf = df.select("ts", "key", "rider", "driver", "fare", "number") finalDf.show(100,false) }{code} There are 4 different outcomes: merger: Avro, useFGReader: false, tableType: cow merger: Avro, useFGReader: true, tableType: cow merger: Spark, useFGReader: false, tableType: cow merger: Spark, useFGReader: true, tableType: cow {code:java} +---+---++-+-+--+ |ts |key|rider |driver |fare |number| +---+---++-+-+--+ |11 |1 |rider-X |driver-X |19.1 |9 | |14 |5 |rider-Z |driver-Z |17.85|3 | |10 |3 |rider-C |driver-C |33.9 |10| |10 |2 |rider-B |driver-B |27.7 |1 | |-10|4 |rider-DD|driver-DD|34.15|5 | +---+---++-+-+--+{code} merger: Avro, useFGReader: false, tableType: mor {code:java} +---+---+---++-+--+ |ts |key|rider |driver |fare |number| +---+---+---++-+--+ |11 |1 |rider-X|driver-X|19.1 |9 | |14 |5 |rider-Z|driver-Z|17.85|3 | |-1 |4 |rider-D|driver-D|34.15|6 | |10 |3 |rider-C|driver-C|33.9 |10| |10 |2 |rider-B|driver-B|27.7 |1 | +---+---+---++-+--+ {code} merger: Avro, useFGReader: true, tableType: mor {code:java}
[jira] [Created] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled
Jonathan Vexler created HUDI-7610: - Summary: Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled Key: HUDI-7610 URL: https://issues.apache.org/jira/browse/HUDI-7610 Project: Apache Hudi Issue Type: Bug Components: reader-core Reporter: Jonathan Vexler Fix For: 1.0.0 Here is a test that can be run on master: {code:java} @Test def showDeleteIsInconsistent(): Unit = { val merger = classOf[HoodieSparkRecordMerger].getName //val merger = classOf[HoodieAvroRecordMerger].getName val useFGReader = "true" //val useFGReader = "false" //val tableType = "COPY_ON_WRITE" val tableType = "MERGE_ON_READ" val columns = Seq("ts", "key", "rider", "driver", "fare", "number") val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7), (10, "2", "rider-B", "driver-B", 27.70, 1), (10, "3", "rider-C", "driver-C", 33.90, 10), (-1, "4", "rider-D", "driver-D", 34.15, 6), (10, "5", "rider-E", "driver-E", 17.85, 10)) val inserts = spark.createDataFrame(data).toDF(columns: _*) inserts.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(TABLE_TYPE.key(), tableType). option("hoodie.table.name", "test_table"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Overwrite). save(basePath) val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9), (9, "2", "rider-Y", "driver-Y", 27.70, 7)) val updates = spark.createDataFrame(updateData).toDF(columns: _*) updates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6)) val deletes = spark.createDataFrame(deletesData).toDF(columns: _*) deletes.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "delete"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), (-10, "4", "rider-DD", "driver-DD", 34.15, 5)) val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: _*) secondUpdates.write.format("hudi"). option(RECORDKEY_FIELD.key(), "key"). option(PRECOMBINE_FIELD.key(), "ts"). option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table"). option(TABLE_TYPE.key(), tableType). option(OPERATION.key(), "upsert"). option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). mode(SaveMode.Append). save(basePath) val df = spark.read.format("hudi"). option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader). option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), "false").load(basePath) val finalDf = df.select("ts", "key", "rider", "driver", "fare", "number") finalDf.show(100,false) }{code} There are 4 different outcomes: merger: Avro, useFGReader: false, tableType: cow merger: Avro, useFGReader: true, tableType: cow merger: Spark, useFGReader: false, tableType: cow merger: Spark, useFGReader: true, tableType: cow {code:java} +---+---++-+-+--+ |ts |key|rider |driver |fare |number| +---+---++-+-+--+ |11 |1 |rider-X |driver-X |19.1 |9 | |14 |5 |rider-Z |driver-Z |17.85|3 | |10 |3 |rider-C |driver-C |33.9 |10| |10 |2 |rider-B |driver-B |27.7 |1 | |-10|4 |rider-DD|driver-DD|34.15|5 | +---+---++-+-+--+{code} merger: Avro, useFGReader: false, tableType: mor {code:java} +---+---+---++-+--+ |ts |key|rider |driver |fare |number| +---+---+---++-+--+ |11 |1 |rider-X|driver-X|19.1 |9 | |14 |5 |rider-Z|driver-Z|17.85|3 | |-1 |4
[PR] Setup spark and timeline services once where possible [hudi]
the-other-tim-brown opened a new pull request, #11009: URL: https://github.com/apache/hudi/pull/11009 ### Change Logs _Describe context and summary for this change. Highlight if any code was copied._ ### Impact _Describe any public API or user-facing feature change or any performance impact._ ### Risk level (write none, low medium or high below) _If medium or high, explain what verification was done to mitigate the risks._ ### Documentation Update _Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none"._ - _The config description must be updated if new configs are added or the default value of the configs are changed_ - _Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the ticket number here and follow the [instruction](https://hudi.apache.org/contribute/developer-setup#website) to make changes to the website._ ### Contributor's checklist - [ ] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute) - [ ] Change Logs and Impact were stated clearly - [ ] Adequate tests were added if applicable - [ ] CI passed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [DO NOT MERGE][HUDI-7567] Add schema evolution to the filegroup reader [hudi]
hudi-bot commented on PR #10957: URL: https://github.com/apache/hudi/pull/10957#issuecomment-2052016581 ## CI report: * 15acc2e870fb880a56de561be9abb72f28fa588d Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23179) * ee7a0e3a401dd0d2c0f2d1095256fb9f27c9802f Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23222) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7565] Create spark file readers to read a single file instead of an entire partition [hudi]
hudi-bot commented on PR #10954: URL: https://github.com/apache/hudi/pull/10954#issuecomment-2052016503 ## CI report: * 8f1ba6d46d8777f39c522d8bcac545ba3d4fd544 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23211) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [DO NOT MERGE][HUDI-7567] Add schema evolution to the filegroup reader [hudi]
hudi-bot commented on PR #10957: URL: https://github.com/apache/hudi/pull/10957#issuecomment-2052005116 ## CI report: * 15acc2e870fb880a56de561be9abb72f28fa588d Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23179) * ee7a0e3a401dd0d2c0f2d1095256fb9f27c9802f UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [DO NOT MERGE][HUDI-7566] Add schema evolution to spark file readers [hudi]
hudi-bot commented on PR #10956: URL: https://github.com/apache/hudi/pull/10956#issuecomment-2051927813 ## CI report: * 088f69ed54db32d1686caa4f457f6fc9aed0 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23131) * 37bc97b3e080cb3664405a446c0174655720d41c Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23221) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [MINOR] Make ordering deterministic in small file selection [hudi]
hudi-bot commented on PR #11008: URL: https://github.com/apache/hudi/pull/11008#issuecomment-2051912011 ## CI report: * e7dde68f9c2bda3e1045d3bcda6c2472072395a0 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23218) * baaff5d03b4199e0aa188492cfa8a5fe2908a47e Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23220) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [DO NOT MERGE][HUDI-7566] Add schema evolution to spark file readers [hudi]
hudi-bot commented on PR #10956: URL: https://github.com/apache/hudi/pull/10956#issuecomment-2051911694 ## CI report: * 088f69ed54db32d1686caa4f457f6fc9aed0 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23131) * 37bc97b3e080cb3664405a446c0174655720d41c UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [MINOR] Make ordering deterministic in small file selection [hudi]
hudi-bot commented on PR #11008: URL: https://github.com/apache/hudi/pull/11008#issuecomment-2051898070 ## CI report: * e7dde68f9c2bda3e1045d3bcda6c2472072395a0 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23218) * baaff5d03b4199e0aa188492cfa8a5fe2908a47e UNKNOWN Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-7565] Create spark file readers to read a single file instead of an entire partition [hudi]
hudi-bot commented on PR #10954: URL: https://github.com/apache/hudi/pull/10954#issuecomment-2051897716 ## CI report: * 8f1ba6d46d8777f39c522d8bcac545ba3d4fd544 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23211) Bot commands @hudi-bot supports the following commands: - `@hudi-bot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org