(hudi) branch master updated: [HUDI-7378] Fix Spark SQL DML with custom key generator (#10615)

2024-04-12 Thread yihua
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
 new 17ea14ab6d6 [HUDI-7378] Fix Spark SQL DML with custom key generator 
(#10615)
17ea14ab6d6 is described below

commit 17ea14ab6d6a8ca7ecef2cfcdbc67b0c87f23987
Author: Y Ethan Guo 
AuthorDate: Fri Apr 12 22:51:03 2024 -0700

[HUDI-7378] Fix Spark SQL DML with custom key generator (#10615)
---
 .../factory/HoodieSparkKeyGeneratorFactory.java|   4 +
 .../org/apache/hudi/util/SparkKeyGenUtils.scala|  16 +-
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |  20 +-
 .../spark/sql/hudi/ProvidesHoodieConfig.scala  |  60 ++-
 .../spark/sql/hudi/TestProvidesHoodieConfig.scala  |  79 +++
 .../hudi/command/MergeIntoHoodieTableCommand.scala |   5 +-
 .../TestSparkSqlWithCustomKeyGenerator.scala   | 571 +
 7 files changed, 742 insertions(+), 13 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
index 1ea5adcd6b4..dcc2eaec9eb 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
@@ -79,6 +79,10 @@ public class HoodieSparkKeyGeneratorFactory {
 
   public static KeyGenerator createKeyGenerator(TypedProperties props) throws 
IOException {
 String keyGeneratorClass = getKeyGeneratorClassName(props);
+return createKeyGenerator(keyGeneratorClass, props);
+  }
+
+  public static KeyGenerator createKeyGenerator(String keyGeneratorClass, 
TypedProperties props) throws IOException {
 boolean autoRecordKeyGen = 
KeyGenUtils.isAutoGeneratedRecordKeysEnabled(props)
 //Need to prevent overwriting the keygen for spark sql merge into 
because we need to extract
 //the recordkey from the meta cols if it exists. Sql keygen will use 
pkless keygen if needed.
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
index 7b91ae5a728..bd094464096 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
@@ -21,8 +21,8 @@ import org.apache.hudi.common.config.TypedProperties
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.common.util.ValidationUtils.checkArgument
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
-import org.apache.hudi.keygen.{AutoRecordKeyGeneratorWrapper, 
AutoRecordGenWrapperKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, 
GlobalAvroDeleteKeyGenerator, GlobalDeleteKeyGenerator, KeyGenerator, 
NonpartitionedAvroKeyGenerator, NonpartitionedKeyGenerator}
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
+import org.apache.hudi.keygen.{AutoRecordKeyGeneratorWrapper, 
CustomAvroKeyGenerator, CustomKeyGenerator, GlobalAvroDeleteKeyGenerator, 
GlobalDeleteKeyGenerator, KeyGenerator, NonpartitionedAvroKeyGenerator, 
NonpartitionedKeyGenerator}
 
 object SparkKeyGenUtils {
 
@@ -35,6 +35,20 @@ object SparkKeyGenUtils {
 getPartitionColumns(keyGenerator, props)
   }
 
+  /**
+   * @param KeyGenClassNameOption key generator class name if present.
+   * @param props config properties.
+   * @return partition column names only, concatenated by ","
+   */
+  def getPartitionColumns(KeyGenClassNameOption: Option[String], props: 
TypedProperties): String = {
+val keyGenerator = if (KeyGenClassNameOption.isEmpty) {
+  HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
+} else {
+  
HoodieSparkKeyGeneratorFactory.createKeyGenerator(KeyGenClassNameOption.get, 
props)
+}
+getPartitionColumns(keyGenerator, props)
+  }
+
   /**
* @param keyGen key generator class name
* @return partition columns
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 63495b0eede..5df773542d6 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -201,8 +201,26 @@ object HoodieWriterUtils {
   
diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n")
 }
 
+// Please note that the validation of partition path fields 

Re: [PR] [HUDI-7378] Fix Spark SQL DML with custom key generator [hudi]

2024-04-12 Thread via GitHub


yihua merged PR #10615:
URL: https://github.com/apache/hudi/pull/10615


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [SUPPORT]After compacting, there are a large number of logs with size 0, and they can never be cleared. [hudi]

2024-04-12 Thread via GitHub


MrAladdin commented on issue #11007:
URL: https://github.com/apache/hudi/issues/11007#issuecomment-205291

   > rollback the compaction
   
   I'm not sure which compact to roll back and how to locate it since it has 
been compacted multiple times already. If it's not addressed, will it be 
automatically cleared later? Is there any specific documentation on this issue? 
I'd like to quickly understand its principle further.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [SUPPORT]There is a deltacommit that remains in the REQUESTED state [hudi]

2024-04-12 Thread via GitHub


MrAladdin commented on issue #11010:
URL: https://github.com/apache/hudi/issues/11010#issuecomment-2052898550

   > You can trigger revert with Hudi CLI. 您可以使用 Hudi CLI 触发还原。
   
   Please, how can I restart, can you give me a specific command example? Also, 
I would like to ask why serial deltacommits would occur in this situation


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-12 Thread via GitHub


yihua commented on code in PR #10956:
URL: https://github.com/apache/hudi/pull/10956#discussion_r1563527600


##
hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParquetSchemaEvolutionUtils.scala:
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.client.utils.SparkInternalSchemaConverter
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.util
+import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.hudi.internal.schema.action.InternalSchemaMerger
+import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
+import org.apache.parquet.hadoop.metadata.FileMetaData
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, 
UnsafeProjection}
+import 
org.apache.spark.sql.execution.datasources.Spark3ParquetSchemaEvolutionUtils.pruneInternalSchema
+import 
org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormatHelper,
 ParquetReadSupport}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{AtomicType, DataType, StructField, 
StructType}
+
+import scala.collection.convert.ImplicitConversions.`collection 
AsScalaIterable`
+
+abstract class Spark3ParquetSchemaEvolutionUtils(sharedConf: Configuration,
+ filePath: Path,
+ requiredSchema: StructType,
+ partitionSchema: StructType) {
+  // Fetch internal schema
+  private lazy val internalSchemaStr: String = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+
+  private lazy val querySchemaOption: util.Option[InternalSchema] = 
pruneInternalSchema(internalSchemaStr, requiredSchema)
+
+  var shouldUseInternalSchema: Boolean = !isNullOrEmpty(internalSchemaStr) && 
querySchemaOption.isPresent
+
+  private lazy val tablePath: String = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
+  private lazy val fileSchema: InternalSchema = if (shouldUseInternalSchema) {
+val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
+val validCommits = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
+InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, 
tablePath, sharedConf, if (validCommits == null) "" else validCommits)
+  } else {
+null
+  }
+
+  def rebuildFilterFromParquet(filter: Filter): Filter = {
+rebuildFilterFromParquetHelper(filter, fileSchema, 
querySchemaOption.orElse(null))
+  }
+
+  private def rebuildFilterFromParquetHelper(oldFilter: Filter, fileSchema: 
InternalSchema, querySchema: InternalSchema): Filter = {
+if (fileSchema == null || querySchema == null) {
+  oldFilter
+} else {
+  oldFilter match {
+case eq: EqualTo =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(eq.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else eq.copy(attribute = 
newAttribute)
+case eqs: EqualNullSafe =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(eqs.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else eqs.copy(attribute = 
newAttribute)
+case gt: GreaterThan =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(gt.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else gt.copy(attribute = 
newAttribute)
+case gtr: GreaterThanOrEqual =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(gtr.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else gtr.copy(attribute = 
newAttribute)
+case lt: LessThan 

Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-12 Thread via GitHub


yihua commented on code in PR #10956:
URL: https://github.com/apache/hudi/pull/10956#discussion_r1563514428


##
hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParquetSchemaEvolutionUtils.scala:
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.client.utils.SparkInternalSchemaConverter
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.util
+import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.hudi.internal.schema.action.InternalSchemaMerger
+import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
+import org.apache.parquet.hadoop.metadata.FileMetaData
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, 
UnsafeProjection}
+import 
org.apache.spark.sql.execution.datasources.Spark3ParquetSchemaEvolutionUtils.pruneInternalSchema
+import 
org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormatHelper,
 ParquetReadSupport}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{AtomicType, DataType, StructField, 
StructType}
+
+import scala.collection.convert.ImplicitConversions.`collection 
AsScalaIterable`
+
+abstract class Spark3ParquetSchemaEvolutionUtils(sharedConf: Configuration,
+ filePath: Path,
+ requiredSchema: StructType,
+ partitionSchema: StructType) {
+  // Fetch internal schema
+  private lazy val internalSchemaStr: String = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+
+  private lazy val querySchemaOption: util.Option[InternalSchema] = 
pruneInternalSchema(internalSchemaStr, requiredSchema)
+
+  var shouldUseInternalSchema: Boolean = !isNullOrEmpty(internalSchemaStr) && 
querySchemaOption.isPresent
+
+  private lazy val tablePath: String = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
+  private lazy val fileSchema: InternalSchema = if (shouldUseInternalSchema) {
+val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
+val validCommits = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
+InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, 
tablePath, sharedConf, if (validCommits == null) "" else validCommits)
+  } else {
+null
+  }
+
+  def rebuildFilterFromParquet(filter: Filter): Filter = {
+rebuildFilterFromParquetHelper(filter, fileSchema, 
querySchemaOption.orElse(null))
+  }
+
+  private def rebuildFilterFromParquetHelper(oldFilter: Filter, fileSchema: 
InternalSchema, querySchema: InternalSchema): Filter = {
+if (fileSchema == null || querySchema == null) {
+  oldFilter
+} else {
+  oldFilter match {
+case eq: EqualTo =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(eq.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else eq.copy(attribute = 
newAttribute)
+case eqs: EqualNullSafe =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(eqs.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else eqs.copy(attribute = 
newAttribute)
+case gt: GreaterThan =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(gt.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else gt.copy(attribute = 
newAttribute)
+case gtr: GreaterThanOrEqual =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(gtr.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else gtr.copy(attribute = 
newAttribute)
+case lt: LessThan 

Re: [PR] [HUDI-7378] Fix Spark SQL DML with custom key generator [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10615:
URL: https://github.com/apache/hudi/pull/10615#issuecomment-2052767220

   
   ## CI report:
   
   * 805ba35b65afbb1daccbcf00291fd520a69c5584 Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23232)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-12 Thread via GitHub


yihua commented on code in PR #10956:
URL: https://github.com/apache/hudi/pull/10956#discussion_r1563399841


##
hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala:
##
@@ -142,11 +149,20 @@ class Spark30ParquetReader(enableVectorizedReader: 
Boolean,
 }
 val taskContext = Option(TaskContext.get())
 if (enableVectorizedReader) {
-  val vectorizedReader = new VectorizedParquetRecordReader(
-convertTz.orNull,
-datetimeRebaseMode.toString,
-enableOffHeapColumnVector && taskContext.isDefined,
-capacity)
+  val vectorizedReader = if (schemaEvolutionUtils.shouldUseInternalSchema) 
{
+schemaEvolutionUtils.buildVectorizedReader(
+  convertTz,
+  datetimeRebaseMode,
+  enableOffHeapColumnVector,
+  taskContext,
+  capacity)
+  } else {
+new VectorizedParquetRecordReader(
+  convertTz.orNull,
+  datetimeRebaseMode.toString,
+  enableOffHeapColumnVector && taskContext.isDefined,
+  capacity)
+  }

Review Comment:
   Fold the check `schemaEvolutionUtils.shouldUseInternalSchema` into 
`schemaEvolutionUtils.buildVectorizedReader` (returning new 
VectorizedParquetRecordReader inside if internal schema is disabled) and 
simplify the logic here as
   ```suggestion
 val vectorizedReader = schemaEvolutionUtils.buildVectorizedReader(
 convertTz,
 datetimeRebaseMode,
 enableOffHeapColumnVector,
 taskContext,
 capacity)
   ```



##
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24ParquetReader.scala:
##
@@ -141,8 +150,20 @@ class Spark24ParquetReader(enableVectorizedReader: Boolean,
 }
 val taskContext = Option(TaskContext.get())
 if (enableVectorizedReader) {
-  val vectorizedReader = new VectorizedParquetRecordReader(
-convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, 
capacity)
+  val vectorizedReader = if (!implicitTypeChangeInfos.isEmpty) {

Review Comment:
   Do we already have tests around schema evolution using the new spark file 
readers?



##
hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala:
##
@@ -142,11 +149,20 @@ class Spark30ParquetReader(enableVectorizedReader: 
Boolean,
 }
 val taskContext = Option(TaskContext.get())
 if (enableVectorizedReader) {
-  val vectorizedReader = new VectorizedParquetRecordReader(
-convertTz.orNull,
-datetimeRebaseMode.toString,
-enableOffHeapColumnVector && taskContext.isDefined,
-capacity)
+  val vectorizedReader = if (schemaEvolutionUtils.shouldUseInternalSchema) 
{
+schemaEvolutionUtils.buildVectorizedReader(
+  convertTz,
+  datetimeRebaseMode,
+  enableOffHeapColumnVector,
+  taskContext,
+  capacity)
+  } else {
+new VectorizedParquetRecordReader(
+  convertTz.orNull,
+  datetimeRebaseMode.toString,
+  enableOffHeapColumnVector && taskContext.isDefined,
+  capacity)
+  }

Review Comment:
   Same for readers for other Spark versions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [MINOR] Make ordering deterministic in small file selection [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #11008:
URL: https://github.com/apache/hudi/pull/11008#issuecomment-2052737016

   
   ## CI report:
   
   * 08eee17c0e936c02e100b65aeba27f81a232452c Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23231)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [MINOR] Make ordering deterministic in small file selection [hudi]

2024-04-12 Thread via GitHub


the-other-tim-brown commented on code in PR #11008:
URL: https://github.com/apache/hudi/pull/11008#discussion_r1563428947


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java:
##
@@ -89,10 +89,13 @@ protected List getSmallFiles(String 
partitionPath) {
   private List getSmallFileCandidates(String partitionPath, 
HoodieInstant latestCommitInstant) {
 // If we can index log files, we can add more inserts to log files for 
fileIds NOT including those under
 // pending compaction
+Comparator comparator = Comparator.comparing(fileSlice -> getTotalFileSize(fileSlice))
+.thenComparing(FileSlice::getFileId);
 if (table.getIndex().canIndexLogFiles()) {
   return table.getSliceView()
   .getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitInstant.getTimestamp(), false)
   .filter(this::isSmallFile)
+  .sorted(comparator)
   .collect(Collectors.toList());

Review Comment:
   I think it makes sense to prefer the smallest files first as candidates to 
minimize IO



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [MINOR] Make ordering deterministic in small file selection [hudi]

2024-04-12 Thread via GitHub


danny0405 commented on code in PR #11008:
URL: https://github.com/apache/hudi/pull/11008#discussion_r1563427282


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java:
##
@@ -89,10 +89,13 @@ protected List getSmallFiles(String 
partitionPath) {
   private List getSmallFileCandidates(String partitionPath, 
HoodieInstant latestCommitInstant) {
 // If we can index log files, we can add more inserts to log files for 
fileIds NOT including those under
 // pending compaction
+Comparator comparator = Comparator.comparing(fileSlice -> getTotalFileSize(fileSlice))
+.thenComparing(FileSlice::getFileId);
 if (table.getIndex().canIndexLogFiles()) {
   return table.getSliceView()
   .getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitInstant.getTimestamp(), false)
   .filter(this::isSmallFile)
+  .sorted(comparator)
   .collect(Collectors.toList());

Review Comment:
   Should we just fix the tests? Do we have gains for the sort in production?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7609] Support array field type whose element type can be nullable [hudi]

2024-04-12 Thread via GitHub


danny0405 commented on code in PR #11006:
URL: https://github.com/apache/hudi/pull/11006#discussion_r1563425831


##
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/Parquet2SparkSchemaUtils.java:
##
@@ -140,7 +141,7 @@ private static String convertGroupField(GroupType field) {
 ValidationUtils.checkArgument(field.getFieldCount() == 1, "Illegal 
List type: " + field);
 Type repeatedType = field.getType(0);
 if (isElementType(repeatedType, field.getName())) {
-  return arrayType(repeatedType, false);
+  return arrayType(repeatedType, true);

Review Comment:
   Can we write a simple test for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [SUPPORT]There is a deltacommit that remains in the REQUESTED state [hudi]

2024-04-12 Thread via GitHub


danny0405 commented on issue #11010:
URL: https://github.com/apache/hudi/issues/11010#issuecomment-2052727390

   You can trigger revert with Hudi CLI.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7608] Fix Flink table creation configuration not taking effect when writing… [hudi]

2024-04-12 Thread via GitHub


danny0405 commented on code in PR #11005:
URL: https://github.com/apache/hudi/pull/11005#discussion_r1563423256


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala:
##
@@ -43,6 +43,11 @@ object HoodieOptionConfig {
*/
   val SQL_VALUE_TABLE_TYPE_MOR = "mor"
 
+  /**
+   * The short name for the value of index type.
+   */
+  val SQL_VALUE_INDEX_TYPE = "index.type"

Review Comment:
   Maybe we can fix the options in hudi catalog.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [SUPPORT]After compacting, there are a large number of logs with size 0, and they can never be cleared. [hudi]

2024-04-12 Thread via GitHub


danny0405 commented on issue #11007:
URL: https://github.com/apache/hudi/issues/11007#issuecomment-2052724244

   You can rollback the compaction with CIL, the cleaner would finally clean 
these logs, because before 1.0, the log cleaning is actually appending new log 
blocks to the corrupt files, which does not really clean the file instantly, 
these files would finally clean with the specific cleaning strategies.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [SUPPORT] StreamWriteFunction support Exectly-Once in Flink ? [hudi]

2024-04-12 Thread via GitHub


danny0405 commented on issue #11004:
URL: https://github.com/apache/hudi/issues/11004#issuecomment-2052723145

   The checkpoint would trigger commit to hudi table.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] [SUPPORT]There is a deltacommit that remains in the REQUESTED state [hudi]

2024-04-12 Thread via GitHub


MrAladdin opened a new issue, #11010:
URL: https://github.com/apache/hudi/issues/11010

   **Describe the problem you faced**
   There is a deltacommit that remains in the REQUESTED state.Does it have an 
impact, will it cause data loss, and how to deal with it next?
   
   
   **Environment Description**
   
   * Hudi version :0.14.1
   
   * Spark version :3.4.1
   
   * Hive version :3.1.2
   
   * Hadoop version :3.1.3
   
   * Storage (HDFS/S3/GCS..) :hdfs
   
   * Running on Docker? (yes/no) :no
   
   **Additional context**
   
   spark structured streaming 、upsert、mor(record_index)
   
   .writeStream
 .format("hudi")
 .option("hoodie.table.base.file.format", "PARQUET")
 .option("hoodie.allow.empty.commit", "true")
 .option("hoodie.datasource.write.drop.partition.columns","false")
 .option("hoodie.table.services.enabled", "true")
 .option("hoodie.datasource.write.streaming.checkpoint.identifier", 
"lakehouse-dwd-social-kbi-beauty-v1-writer-1")
 .option(PRECOMBINE_FIELD.key(), "date_kbiUdate")
 .option(RECORDKEY_FIELD.key(), "records_key")
 .option(PARTITIONPATH_FIELD.key(), "partition_index_date")
 .option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
 .option(DataSourceWriteOptions.TABLE_TYPE.key(), 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
 .option("hoodie.combine.before.upsert", "true")

.option("hoodie.datasource.write.payload.class","org.apache.hudi.common.model.OverwriteWithLatestAvroPayload")
 //markers
 .option("hoodie.write.markers.type", "DIRECT")
   
 //timeline server
 .option("hoodie.embed.timeline.server", "true")
   
 //File System View Storage Configurations
 .option("hoodie.filesystem.view.remote.timeout.secs", "1200")
 .option("hoodie.filesystem.view.remote.retry.enable", "true")
 .option("hoodie.filesystem.view.remote.retry.initial_interval_ms", 
"500")
 .option("hoodie.filesystem.view.remote.retry.max_numbers", "15")
 .option("hoodie.filesystem.view.remote.retry.max_interval_ms", 
"8000")
   
 //schema cache
 .option("hoodie.schema.cache.enable", "true")
   
 //spark write
 .option("hoodie.datasource.write.streaming.ignore.failed.batch", 
"false")
 .option("hoodie.datasource.write.streaming.retry.count", "6")
 .option("hoodie.datasource.write.streaming.retry.interval.ms", 
"3000")
   
 //metadata
 .option("hoodie.metadata.enable", "true")
 .option("hoodie.metadata.index.async", "false")
 .option("hoodie.metadata.index.check.timeout.seconds", "900")
 .option("hoodie.auto.adjust.lock.configs", "true")
 .option("hoodie.metadata.optimized.log.blocks.scan.enable", "true")
 .option("hoodie.metadata.index.column.stats.enable", "false")
 .option("hoodie.metadata.index.column.stats.parallelism", "100")
 .option("hoodie.metadata.index.column.stats.file.group.count", "4")
 
.option("hoodie.metadata.index.column.stats.column.list","date_udate,date_publishedAt")
 .option("hoodie.metadata.compact.max.delta.commits", "10")
   
   
 //metadata
 .option("hoodie.metadata.record.index.enable", "true")
 .option("hoodie.index.type", "RECORD_INDEX")
 .option("hoodie.metadata.max.init.parallelism", "10")
 .option("hoodie.metadata.record.index.min.filegroup.count", "10")
 .option("hoodie.metadata.record.index.max.filegroup.count", 
"1")
 .option("hoodie.metadata.record.index.max.filegroup.size", 
"1073741824")
 .option("hoodie.metadata.auto.initialize", "true")
 .option("hoodie.metadata.record.index.growth.factor", "2.0")
 .option("hoodie.metadata.max.logfile.size", "2147483648")
 .option("hoodie.metadata.log.compaction.enable", "false")
 .option("hoodie.metadata.log.compaction.blocks.threshold", "5")
 .option("hoodie.metadata.max.deltacommits.when_pending", "1000")
   
 //file size
 .option("hoodie.parquet.field_id.write.enabled", "true")
 .option("hoodie.copyonwrite.insert.auto.split", "true")
 .option("hoodie.record.size.estimation.threshold", "1.0")
 .option("hoodie.parquet.block.size", "536870912")
 .option("hoodie.parquet.max.file.size", "536870912")
 .option("hoodie.parquet.small.file.limit", "314572800")
 .option("hoodie.logfile.max.size", "536870912")
 .option("hoodie.logfile.data.block.max.size", "536870912")
 .option("hoodie.logfile.to.parquet.compression.ratio", "0.35")
   
 

Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10956:
URL: https://github.com/apache/hudi/pull/10956#issuecomment-2052706711

   
   ## CI report:
   
   * c8f507bcac03c7183893400487a1885400c46853 Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23230)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7378] Fix Spark SQL DML with custom key generator [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10615:
URL: https://github.com/apache/hudi/pull/10615#issuecomment-2052703726

   
   ## CI report:
   
   * dfab8e1285bf0241eea2e71f9d85607c647446d7 Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23212)
 
   * 805ba35b65afbb1daccbcf00291fd520a69c5584 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23232)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7378] Fix Spark SQL DML with custom key generator [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10615:
URL: https://github.com/apache/hudi/pull/10615#issuecomment-2052699814

   
   ## CI report:
   
   * dfab8e1285bf0241eea2e71f9d85607c647446d7 Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23212)
 
   * 805ba35b65afbb1daccbcf00291fd520a69c5584 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (HUDI-7615) Mark a few write configs with the correct sinceVersion

2024-04-12 Thread Ethan Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo updated HUDI-7615:

Component/s: configs

> Mark a few write configs with the correct sinceVersion
> --
>
> Key: HUDI-7615
> URL: https://issues.apache.org/jira/browse/HUDI-7615
> Project: Apache Hudi
>  Issue Type: Improvement
>  Components: configs
>Reporter: Ethan Guo
>Assignee: tao pan
>Priority: Major
>
> The following write configs are not associated with the correct since version 
> (ConfigProperty#sinceVersion), which should be fixed.  Correct version is 
> listed below:
> hoodie.metadata.log.compaction.enable -> 0.14.0
> hoodie.log.compaction.enable -> 0.14.0
> hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled -> 
> 0.10.1
> hoodie.datasource.write.payload.type -> 1.0.0
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-7615) Mark a few write configs with the correct sinceVersion

2024-04-12 Thread Ethan Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo updated HUDI-7615:

Description: 
The following write configs are not associated with the correct since version 
(ConfigProperty#sinceVersion), which should be fixed.  Correct version is 
listed below:

hoodie.metadata.log.compaction.enable -> 0.14.0
hoodie.log.compaction.enable -> 0.14.0
hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled -> 
0.10.1
hoodie.datasource.write.payload.type -> 1.0.0

 

> Mark a few write configs with the correct sinceVersion
> --
>
> Key: HUDI-7615
> URL: https://issues.apache.org/jira/browse/HUDI-7615
> Project: Apache Hudi
>  Issue Type: Improvement
>Reporter: Ethan Guo
>Assignee: tao pan
>Priority: Major
>
> The following write configs are not associated with the correct since version 
> (ConfigProperty#sinceVersion), which should be fixed.  Correct version is 
> listed below:
> hoodie.metadata.log.compaction.enable -> 0.14.0
> hoodie.log.compaction.enable -> 0.14.0
> hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled -> 
> 0.10.1
> hoodie.datasource.write.payload.type -> 1.0.0
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-7615) Mark a few write configs with the correct sinceVersion

2024-04-12 Thread Ethan Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo updated HUDI-7615:

Fix Version/s: 0.15.0
   1.0.0

> Mark a few write configs with the correct sinceVersion
> --
>
> Key: HUDI-7615
> URL: https://issues.apache.org/jira/browse/HUDI-7615
> Project: Apache Hudi
>  Issue Type: Improvement
>  Components: configs
>Reporter: Ethan Guo
>Assignee: tao pan
>Priority: Major
> Fix For: 0.15.0, 1.0.0
>
>
> The following write configs are not associated with the correct since version 
> (ConfigProperty#sinceVersion), which should be fixed.  Correct version is 
> listed below:
> hoodie.metadata.log.compaction.enable -> 0.14.0
> hoodie.log.compaction.enable -> 0.14.0
> hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled -> 
> 0.10.1
> hoodie.datasource.write.payload.type -> 1.0.0
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7615) Mark a few write configs with the correct sinceVersion

2024-04-12 Thread Ethan Guo (Jira)
Ethan Guo created HUDI-7615:
---

 Summary: Mark a few write configs with the correct sinceVersion
 Key: HUDI-7615
 URL: https://issues.apache.org/jira/browse/HUDI-7615
 Project: Apache Hudi
  Issue Type: Improvement
Reporter: Ethan Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (HUDI-7615) Mark a few write configs with the correct sinceVersion

2024-04-12 Thread Ethan Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo reassigned HUDI-7615:
---

Assignee: tao pan  (was: Ethan Guo)

> Mark a few write configs with the correct sinceVersion
> --
>
> Key: HUDI-7615
> URL: https://issues.apache.org/jira/browse/HUDI-7615
> Project: Apache Hudi
>  Issue Type: Improvement
>Reporter: Ethan Guo
>Assignee: tao pan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (HUDI-7615) Mark a few write configs with the correct sinceVersion

2024-04-12 Thread Ethan Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo reassigned HUDI-7615:
---

Assignee: Ethan Guo

> Mark a few write configs with the correct sinceVersion
> --
>
> Key: HUDI-7615
> URL: https://issues.apache.org/jira/browse/HUDI-7615
> Project: Apache Hudi
>  Issue Type: Improvement
>Reporter: Ethan Guo
>Assignee: Ethan Guo
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [HUDI-7378] Fix Spark SQL DML with custom key generator [hudi]

2024-04-12 Thread via GitHub


yihua commented on code in PR #10615:
URL: https://github.com/apache/hudi/pull/10615#discussion_r1563323590


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##
@@ -530,6 +539,40 @@ object ProvidesHoodieConfig {
   filterNullValues(overridingOpts)
   }
 
+  /**
+   * @param tableConfigKeyGeneratorClassName key generator class name in 
the table config.
+   * @param partitionFieldNamesWithoutKeyGenType partition field names without 
key generator types
+   * from the table config.
+   * @param catalogTable HoodieCatalogTable instance 
to fetch table properties.
+   * @return the write config value to set for 
"hoodie.datasource.write.partitionpath.field".
+   */
+  def getPartitionPathFieldWriteConfig(tableConfigKeyGeneratorClassName: 
String,
+   partitionFieldNamesWithoutKeyGenType: 
String,
+   catalogTable: HoodieCatalogTable): 
String = {
+if (StringUtils.isNullOrEmpty(tableConfigKeyGeneratorClassName)) {
+  partitionFieldNamesWithoutKeyGenType
+} else {
+  val writeConfigPartitionField = 
catalogTable.catalogProperties.get(PARTITIONPATH_FIELD.key())
+  val keyGenClass = 
ReflectionUtils.getClass(tableConfigKeyGeneratorClassName)
+  if (classOf[CustomKeyGenerator].equals(keyGenClass)

Review Comment:
   The assumption is that these key generators should not be extended.  We 
should keep it this way for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [MINOR] Make ordering deterministic in small file selection [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #11008:
URL: https://github.com/apache/hudi/pull/11008#issuecomment-2052662120

   
   ## CI report:
   
   * baaff5d03b4199e0aa188492cfa8a5fe2908a47e Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23220)
 
   * 08eee17c0e936c02e100b65aeba27f81a232452c Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23231)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [MINOR] Make ordering deterministic in small file selection [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #11008:
URL: https://github.com/apache/hudi/pull/11008#issuecomment-2052654450

   
   ## CI report:
   
   * baaff5d03b4199e0aa188492cfa8a5fe2908a47e Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23220)
 
   * 08eee17c0e936c02e100b65aeba27f81a232452c UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10956:
URL: https://github.com/apache/hudi/pull/10956#issuecomment-2052654190

   
   ## CI report:
   
   * a73f9559fc8626342b767085cf7a56f743a425fc Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23227)
 
   * c8f507bcac03c7183893400487a1885400c46853 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23230)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10956:
URL: https://github.com/apache/hudi/pull/10956#issuecomment-2052647727

   
   ## CI report:
   
   * a73f9559fc8626342b767085cf7a56f743a425fc Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23227)
 
   * c8f507bcac03c7183893400487a1885400c46853 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7378] Fix Spark SQL DML with custom key generator [hudi]

2024-04-12 Thread via GitHub


yihua commented on code in PR #10615:
URL: https://github.com/apache/hudi/pull/10615#discussion_r1563245298


##
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala:
##
@@ -0,0 +1,571 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.functional.TestSparkSqlWithCustomKeyGenerator._
+import org.apache.hudi.util.SparkKeyGenUtils
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+import org.joda.time.DateTime
+import org.joda.time.format.DateTimeFormat
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.slf4j.LoggerFactory
+
+import java.io.IOException
+
+/**
+ * Tests Spark SQL DML with custom key generator and write configs.
+ */
+class TestSparkSqlWithCustomKeyGenerator extends HoodieSparkSqlTestBase {
+  private val LOG = LoggerFactory.getLogger(getClass)
+
+  test("Test Spark SQL DML with custom key generator") {
+withTempDir { tmp =>
+  Seq(
+Seq("COPY_ON_WRITE", "ts:timestamp,segment:simple",
+  "(ts=202401, segment='cat2')", "202401/cat2",
+  Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3", 
"202402/cat1", "202402/cat3", "202402/cat5"),
+  TS_FORMATTER_FUNC,
+  (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + "/" 
+ segment),
+Seq("MERGE_ON_READ", "segment:simple",
+  "(segment='cat3')", "cat3",
+  Seq("cat1", "cat2", "cat4", "cat5"),
+  TS_TO_STRING_FUNC,
+  (_: Integer, segment: String) => segment),
+Seq("MERGE_ON_READ", "ts:timestamp",
+  "(ts=202312)", "202312",
+  Seq("202401", "202402"),
+  TS_FORMATTER_FUNC,
+  (ts: Integer, _: String) => TS_FORMATTER_FUNC.apply(ts)),
+Seq("MERGE_ON_READ", "ts:timestamp,segment:simple",
+  "(ts=202401, segment='cat2')", "202401/cat2",
+  Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3", 
"202402/cat1", "202402/cat3", "202402/cat5"),
+  TS_FORMATTER_FUNC,
+  (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + "/" 
+ segment)
+  ).foreach { testParams =>
+withTable(generateTableName) { tableName =>
+  LOG.warn("Testing with parameters: " + testParams)
+  val tableType = testParams(0).asInstanceOf[String]
+  val writePartitionFields = testParams(1).asInstanceOf[String]
+  val dropPartitionStatement = testParams(2).asInstanceOf[String]
+  val droppedPartition = testParams(3).asInstanceOf[String]
+  val expectedPartitions = testParams(4).asInstanceOf[Seq[String]]
+  val tsGenFunc = testParams(5).asInstanceOf[Integer => String]
+  val partitionGenFunc = testParams(6).asInstanceOf[(Integer, String) 
=> String]
+  val tablePath = tmp.getCanonicalPath + "/" + tableName
+  val timestampKeyGeneratorConfig = if 
(writePartitionFields.contains("timestamp")) {
+TS_KEY_GEN_CONFIGS
+  } else {
+Map[String, String]()
+  }
+  val timestampKeyGenProps = if (timestampKeyGeneratorConfig.nonEmpty) 
{
+", " + timestampKeyGeneratorConfig.map(e => e._1 + " = '" + e._2 + 
"'").mkString(", ")
+  } else {
+""
+  }
+
+  prepareTableWithKeyGenerator(
+tableName, tablePath, tableType,
+CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields, 
timestampKeyGeneratorConfig)
+
+  // SQL CTAS with table properties containing key generator write 
configs
+  createTableWithSql(tableName, tablePath,
+s"hoodie.datasource.write.partitionpath.field = 
'$writePartitionFields'" + timestampKeyGenProps)
+
+  // Prepare source and test SQL INSERT INTO
+  val sourceTableName = tableName + "_source"
+ 

Re: [PR] [HUDI-7269] Fallback to key based merge if positions are missing from log block [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10991:
URL: https://github.com/apache/hudi/pull/10991#issuecomment-2052596559

   
   ## CI report:
   
   * 2af03c004aef66248dae6283e9c2f1e63e062e75 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23229)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [DO NOT MERGE][HUDI-7567] Add schema evolution to the filegroup reader [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10957:
URL: https://github.com/apache/hudi/pull/10957#issuecomment-2052596475

   
   ## CI report:
   
   * 966e8c85f2afb0ffaf00e12d02eb41b41c68e0bc Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23228)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10956:
URL: https://github.com/apache/hudi/pull/10956#issuecomment-2052596422

   
   ## CI report:
   
   * a73f9559fc8626342b767085cf7a56f743a425fc Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23227)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (HUDI-7614) Run hudi-cli tests in Azure CI

2024-04-12 Thread Ethan Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo reassigned HUDI-7614:
---

Assignee: Shawn Chang

> Run hudi-cli tests in Azure CI
> --
>
> Key: HUDI-7614
> URL: https://issues.apache.org/jira/browse/HUDI-7614
> Project: Apache Hudi
>  Issue Type: Improvement
>Reporter: Ethan Guo
>Assignee: Shawn Chang
>Priority: Major
> Fix For: 1.0.0
>
>
> Right now Azure CI does not run tests in hudi-cli module.  Some tests in 
> hudi-cli module fail locally.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-7614) Run hudi-cli tests in Azure CI

2024-04-12 Thread Ethan Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo updated HUDI-7614:

Description: Right now Azure CI does not run tests in hudi-cli module.  
Some tests in hudi-cli module fail locally.

> Run hudi-cli tests in Azure CI
> --
>
> Key: HUDI-7614
> URL: https://issues.apache.org/jira/browse/HUDI-7614
> Project: Apache Hudi
>  Issue Type: Improvement
>Reporter: Ethan Guo
>Priority: Major
> Fix For: 1.0.0
>
>
> Right now Azure CI does not run tests in hudi-cli module.  Some tests in 
> hudi-cli module fail locally.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-7614) Run hudi-cli tests in Azure CI

2024-04-12 Thread Ethan Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo updated HUDI-7614:

Epic Link: HUDI-4302

> Run hudi-cli tests in Azure CI
> --
>
> Key: HUDI-7614
> URL: https://issues.apache.org/jira/browse/HUDI-7614
> Project: Apache Hudi
>  Issue Type: Improvement
>Reporter: Ethan Guo
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-7614) Run hudi-cli tests in Azure CI

2024-04-12 Thread Ethan Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo updated HUDI-7614:

Fix Version/s: 1.0.0

> Run hudi-cli tests in Azure CI
> --
>
> Key: HUDI-7614
> URL: https://issues.apache.org/jira/browse/HUDI-7614
> Project: Apache Hudi
>  Issue Type: Improvement
>Reporter: Ethan Guo
>Priority: Major
> Fix For: 1.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7614) Run hudi-cli tests in Azure CI

2024-04-12 Thread Ethan Guo (Jira)
Ethan Guo created HUDI-7614:
---

 Summary: Run hudi-cli tests in Azure CI
 Key: HUDI-7614
 URL: https://issues.apache.org/jira/browse/HUDI-7614
 Project: Apache Hudi
  Issue Type: Improvement
Reporter: Ethan Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [HUDI-7378] Fix Spark SQL DML with custom key generator [hudi]

2024-04-12 Thread via GitHub


yihua commented on code in PR #10615:
URL: https://github.com/apache/hudi/pull/10615#discussion_r1563198254


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##
@@ -530,6 +539,40 @@ object ProvidesHoodieConfig {
   filterNullValues(overridingOpts)
   }
 
+  /**
+   * @param tableConfigKeyGeneratorClassName key generator class name in 
the table config.
+   * @param partitionFieldNamesWithoutKeyGenType partition field names without 
key generator types
+   * from the table config.
+   * @param catalogTable HoodieCatalogTable instance 
to fetch table properties.
+   * @return the write config value to set for 
"hoodie.datasource.write.partitionpath.field".
+   */
+  def getPartitionPathFieldWriteConfig(tableConfigKeyGeneratorClassName: 
String,
+   partitionFieldNamesWithoutKeyGenType: 
String,
+   catalogTable: HoodieCatalogTable): 
String = {
+if (StringUtils.isNullOrEmpty(tableConfigKeyGeneratorClassName)) {
+  partitionFieldNamesWithoutKeyGenType
+} else {
+  val writeConfigPartitionField = 
catalogTable.catalogProperties.get(PARTITIONPATH_FIELD.key())

Review Comment:
   As an example, the table looks like this in Spark catalog:
   ```
   spark-sql (default)> DESCRIBE TABLE formatted h0;
   24/04/12 13:59:53 WARN ObjectStore: Failed to get database global_temp, 
returning NoSuchObjectException
   _hoodie_commit_time  string  
   _hoodie_commit_seqno string  
   _hoodie_record_key   string  
   _hoodie_partition_path   string  
   _hoodie_file_namestring  
   id   int 
   name string  
   pricedecimal(5,1)
   ts   int 
   segment  string  
   # Partition Information  
   # col_name   data_type   comment 
   ts   int 
   segment  string  

   # Detailed Table Information 
   Catalog  spark_catalog   
   Database default 
   Tableh0  
   Ownerethan   
   Created Time Fri Apr 12 13:58:05 PDT 2024
   Last Access  UNKNOWN 
   Created By   Spark 3.5.1 
   Type EXTERNAL
   Provider hudi
   Table Properties 
[hoodie.datasource.write.partitionpath.field=ts:timestamp,segment:simple, 
preCombineField=name, primaryKey=id, provider=hudi, type=cow]   
  
   Location 
file:/private/var/folders/60/wk8qzx310fd32b2dp7mhzvdcgn/T/spark-4ac6fb47-e20b-4679-a668-e28238ec3e05/h0
 
   Serde Library
org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe 

   InputFormat  org.apache.hudi.hadoop.HoodieParquetInputFormat 

   OutputFormat 
org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat  

   Time taken: 1.694 seconds, Fetched 30 row(s)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7378] Fix Spark SQL DML with custom key generator [hudi]

2024-04-12 Thread via GitHub


yihua commented on code in PR #10615:
URL: https://github.com/apache/hudi/pull/10615#discussion_r1563196323


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##
@@ -530,6 +539,40 @@ object ProvidesHoodieConfig {
   filterNullValues(overridingOpts)
   }
 
+  /**
+   * @param tableConfigKeyGeneratorClassName key generator class name in 
the table config.
+   * @param partitionFieldNamesWithoutKeyGenType partition field names without 
key generator types
+   * from the table config.
+   * @param catalogTable HoodieCatalogTable instance 
to fetch table properties.
+   * @return the write config value to set for 
"hoodie.datasource.write.partitionpath.field".
+   */
+  def getPartitionPathFieldWriteConfig(tableConfigKeyGeneratorClassName: 
String,
+   partitionFieldNamesWithoutKeyGenType: 
String,
+   catalogTable: HoodieCatalogTable): 
String = {
+if (StringUtils.isNullOrEmpty(tableConfigKeyGeneratorClassName)) {
+  partitionFieldNamesWithoutKeyGenType
+} else {
+  val writeConfigPartitionField = 
catalogTable.catalogProperties.get(PARTITIONPATH_FIELD.key())

Review Comment:
   Yes, the table properties associated with `HoodieCatalogTable` are persisted 
across Spark sessions.  The persisted partition field write config 
`hoodie.datasource.write.partitionpath.field` is a custom config outside Spark, 
which is used by Hudi logic only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled

2024-04-12 Thread Jonathan Vexler (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836740#comment-17836740
 ] 

Jonathan Vexler edited comment on HUDI-7610 at 4/12/24 8:40 PM:


retest delete where delete precombine is less than insert
{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  val merger = classOf[HoodieSparkRecordMerger].getName
  //val merger = classOf[HoodieAvroRecordMerger].getName
  //val useFGReader = "true"
  val useFGReader = "false"
  //val tableType = "COPY_ON_WRITE"
  val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "number")
  val data = Seq((10, "A", 1), (10, "B", 1))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(TABLE_TYPE.key(), tableType).
option("hoodie.table.name", "test_table").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Overwrite).
save(basePath)

  val updateData = Seq((12, "A", 2))

  val updates = spark.createDataFrame(updateData).toDF(columns: _*)

  updates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val deleteData = Seq((9, "B", 2))
  val deletes = spark.createDataFrame(deleteData).toDF(columns: _*)
  deletes.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "delete").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val updateData2 = Seq((11, "A", 3),(8, "B", 3))
  val updates2 = spark.createDataFrame(updateData2).toDF(columns: _*)
  updates2.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val df = spark.read.format("hudi").
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), 
"false").load(basePath)
  val finalDf = df.select("ts", "key", "number")
  finalDf.show(100, false)
  finalDf.show(100, false)
} {code}
 

 

merger: Spark, useFGReader: false, tableType: cow
merger: Spark, useFGReader: true, tableType: cow
merger: Avro, useFGReader: true, tableType: cow
merger: Avro, useFGReader: false, tableType: cow
{code:java}
+---+---+--+
|ts |key|number|
+---+---+--+
|12 |A  |2 |
|8  |B  |3 |
+---+---+--+ {code}
 

merger: Avro, useFGReader: false, tableType: mor
{code:java}
+---+---+--+
|ts |key|number|
+---+---+--+
|12 |A  |2 |
|10 |B  |1 |
+---+---+--+ {code}
 

merger: Avro, useFGReader: true, tableType: mor
{code:java}
+---+---+--+
|ts |key|number|
+---+---+--+
|12 |A  |2 |
+---+---+--+ {code}
 

merger: Spark, useFGReader: true, tableType: mor
NPE

 

merger: Spark, useFGReader: false, tableType: mor
Caused by: org.apache.hudi.exception.HoodieException: Ordering value is null for


was (Author: JIRAUSER295101):
retest delete where delete precombine is less than insert
{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  val merger = classOf[HoodieSparkRecordMerger].getName
  //val merger = classOf[HoodieAvroRecordMerger].getName
  //val useFGReader = "true"
  val useFGReader = "false"
  //val tableType = "COPY_ON_WRITE"
  val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "number")
  val data = Seq((10, "A", 1), (10, "B", 1))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").

[jira] [Comment Edited] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled

2024-04-12 Thread Jonathan Vexler (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836738#comment-17836738
 ] 

Jonathan Vexler edited comment on HUDI-7610 at 4/12/24 8:40 PM:


retest because default payload changed in the last few days 
{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  //val merger = classOf[HoodieSparkRecordMerger].getName
  val merger = classOf[HoodieAvroRecordMerger].getName
  val useFGReader = "true"
  //val useFGReader = "false"
  val tableType = "COPY_ON_WRITE"
  //val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "number")
  val data = Seq((10, "A", 1), (10, "B", 1))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(TABLE_TYPE.key(), tableType).
option("hoodie.table.name", "test_table").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Overwrite).
save(basePath)

  val updateData = Seq((9, "A", 2))

  val updates = spark.createDataFrame(updateData).toDF(columns: _*)

  updates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val deleteData = Seq((11, "B", 2))
  val deletes = spark.createDataFrame(deleteData).toDF(columns: _*)
  deletes.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "delete").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val updateData2 = Seq((11, "A", 3),(9, "B", 3))
  val updates2 = spark.createDataFrame(updateData2).toDF(columns: _*)
  updates2.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val df = spark.read.format("hudi").
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), 
"false").load(basePath)
  val finalDf = df.select("ts", "key", "number")
  finalDf.show(100, false)
  finalDf.show(100, false)
} {code}
 

 

merger: Spark, useFGReader: false, tableType: cow
merger: Spark, useFGReader: true, tableType: cow
merger: Avro, useFGReader: true, tableType: cow
merger: Avro, useFGReader: false, tableType: cow
{code:java}
+---+---+--+
|ts |key|number|
+---+---+--+
|11 |A  |3 |
|9  |B  |3 |
+---+---+--+ {code}
 

 

merger: Avro, useFGReader: false, tableType: mor
{code:java}
+---+---+--+
|ts |key|number|
+---+---+--+
|11 |A  |3 |
|10 |B  |1 |
+---+---+--+ {code}
 

 

merger: Avro, useFGReader: true, tableType: mor
{code:java}
+---+---+--+
|ts |key|number|
+---+---+--+
|11 |A  |3 |
+---+---+--+ {code}
 

 

merger: Spark, useFGReader: true, tableType: mor
NPE

 

 

merger: Spark, useFGReader: false, tableType: mor
Caused by: org.apache.hudi.exception.HoodieException: Ordering value is null 
for record: null


was (Author: JIRAUSER295101):
retest because default payload changed in the last few days 


{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  //val merger = classOf[HoodieSparkRecordMerger].getName
  val merger = classOf[HoodieAvroRecordMerger].getName
  val useFGReader = "true"
  //val useFGReader = "false"
  val tableType = "COPY_ON_WRITE"
  //val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "number")
  val data = Seq((10, "A", 1), (10, "B", 1))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").

[jira] [Commented] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled

2024-04-12 Thread Jonathan Vexler (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836740#comment-17836740
 ] 

Jonathan Vexler commented on HUDI-7610:
---

retest delete where delete precombine is less than insert
{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  val merger = classOf[HoodieSparkRecordMerger].getName
  //val merger = classOf[HoodieAvroRecordMerger].getName
  //val useFGReader = "true"
  val useFGReader = "false"
  //val tableType = "COPY_ON_WRITE"
  val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "number")
  val data = Seq((10, "A", 1), (10, "B", 1))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(TABLE_TYPE.key(), tableType).
option("hoodie.table.name", "test_table").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Overwrite).
save(basePath)

  val updateData = Seq((12, "A", 2))

  val updates = spark.createDataFrame(updateData).toDF(columns: _*)

  updates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val deleteData = Seq((9, "B", 2))
  val deletes = spark.createDataFrame(deleteData).toDF(columns: _*)
  deletes.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "delete").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val updateData2 = Seq((11, "A", 3),(8, "B", 3))
  val updates2 = spark.createDataFrame(updateData2).toDF(columns: _*)
  updates2.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val df = spark.read.format("hudi").
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), 
"false").load(basePath)
  val finalDf = df.select("ts", "key", "number")
  finalDf.show(100, false)
  finalDf.show(100, false)
} {code}
merger: Spark, useFGReader: false, tableType: cow
merger: Spark, useFGReader: true, tableType: cow
merger: Avro, useFGReader: true, tableType: cow
merger: Avro, useFGReader: false, tableType: cow
{code:java}
+---+---+--+
|ts |key|number|
+---+---+--+
|12 |A  |2 |
|8  |B  |3 |
+---+---+--+ {code}
 
merger: Avro, useFGReader: false, tableType: mor
{code:java}
+---+---+--+
|ts |key|number|
+---+---+--+
|12 |A  |2 |
|10 |B  |1 |
+---+---+--+ {code}
 
merger: Avro, useFGReader: true, tableType: mor
{code:java}
+---+---+--+
|ts |key|number|
+---+---+--+
|12 |A  |2 |
+---+---+--+ {code}
 
merger: Spark, useFGReader: true, tableType: mor
NPE


merger: Spark, useFGReader: false, tableType: mor
Caused by: org.apache.hudi.exception.HoodieException: Ordering value is null for

> Delete records are inconsistent depending on MOR/COW, Avro/Spark record 
> merger, new filegroup reader enabled/disabled
> -
>
> Key: HUDI-7610
> URL: https://issues.apache.org/jira/browse/HUDI-7610
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: reader-core
>Reporter: Jonathan Vexler
>Priority: Blocker
> Fix For: 1.0.0
>
>
> Here is a test that can be run on master:
>  
> {code:java}
> @Test
> def showDeleteIsInconsistent(): Unit = {
>   val merger = classOf[HoodieSparkRecordMerger].getName
> 

Re: [PR] Setup spark and timeline services once where possible [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #11009:
URL: https://github.com/apache/hudi/pull/11009#issuecomment-2052497270

   
   ## CI report:
   
   * f6f303c7a2c89f5926d1f8dfda1d39fdd0134cba Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23226)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled

2024-04-12 Thread Jonathan Vexler (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836738#comment-17836738
 ] 

Jonathan Vexler commented on HUDI-7610:
---

retest because default payload changed in the last few days 


{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  //val merger = classOf[HoodieSparkRecordMerger].getName
  val merger = classOf[HoodieAvroRecordMerger].getName
  val useFGReader = "true"
  //val useFGReader = "false"
  val tableType = "COPY_ON_WRITE"
  //val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "number")
  val data = Seq((10, "A", 1), (10, "B", 1))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(TABLE_TYPE.key(), tableType).
option("hoodie.table.name", "test_table").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Overwrite).
save(basePath)

  val updateData = Seq((9, "A", 2))

  val updates = spark.createDataFrame(updateData).toDF(columns: _*)

  updates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val deleteData = Seq((11, "B", 2))
  val deletes = spark.createDataFrame(deleteData).toDF(columns: _*)
  deletes.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "delete").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val updateData2 = Seq((11, "A", 3),(9, "B", 3))
  val updates2 = spark.createDataFrame(updateData2).toDF(columns: _*)
  updates2.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val df = spark.read.format("hudi").
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), 
"false").load(basePath)
  val finalDf = df.select("ts", "key", "number")
  finalDf.show(100, false)
  finalDf.show(100, false)
} {code}



merger: Spark, useFGReader: false, tableType: cow
merger: Spark, useFGReader: true, tableType: cow
merger: Avro, useFGReader: true, tableType: cow
merger: Avro, useFGReader: false, tableType: cow
{code:java}
+---+---+--+
|ts |key|number|
+---+---+--+
|11 |A  |3 |
|9  |B  |3 |
+---+---+--+ {code}
merger: Avro, useFGReader: false, tableType: mor
{code:java}
+---+---+--+
|ts |key|number|
+---+---+--+
|11 |A  |3 |
|10 |B  |1 |
+---+---+--+ {code}
merger: Avro, useFGReader: true, tableType: mor

{code:java}
+---+---+--+
|ts |key|number|
+---+---+--+
|11 |A  |3 |
+---+---+--+ {code}
merger: Spark, useFGReader: true, tableType: mor
NPE



merger: Spark, useFGReader: false, tableType: mor
Caused by: org.apache.hudi.exception.HoodieException: Ordering value is null 
for record: null

> Delete records are inconsistent depending on MOR/COW, Avro/Spark record 
> merger, new filegroup reader enabled/disabled
> -
>
> Key: HUDI-7610
> URL: https://issues.apache.org/jira/browse/HUDI-7610
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: reader-core
>Reporter: Jonathan Vexler
>Priority: Blocker
> Fix For: 1.0.0
>
>
> Here is a test that can be run on master:
>  
> {code:java}
> @Test
> def showDeleteIsInconsistent(): Unit = {
>   val merger = 

Re: [PR] [MINOR] Hudi CLI 'version' command output empty string [hudi]

2024-04-12 Thread via GitHub


pt657407064 commented on code in PR #10973:
URL: https://github.com/apache/hudi/pull/10973#discussion_r1563160252


##
hudi-cli/src/main/resources/application.yml:
##
@@ -20,4 +20,7 @@ spring:
   shell:
 history:
   enabled: true
-  name: hoodie-cmd.log
\ No newline at end of file
+  name: hoodie-cmd.log
+command:
+  version:
+template: "classpath:version.txt"

Review Comment:
   CI issue has been fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7378] Fix Spark SQL DML with custom key generator [hudi]

2024-04-12 Thread via GitHub


yihua commented on code in PR #10615:
URL: https://github.com/apache/hudi/pull/10615#discussion_r1563151868


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##
@@ -528,6 +536,40 @@ object ProvidesHoodieConfig {
   filterNullValues(overridingOpts)
   }
 
+  /**
+   * @param tableConfigKeyGeneratorClassName key generator class name in 
the table config.
+   * @param partitionFieldNamesWithoutKeyGenType partition field names without 
key generator types
+   * from the table config.
+   * @param catalogTable HoodieCatalogTable instance 
to fetch table properties.
+   * @return the write config value to set for 
"hoodie.datasource.write.partitionpath.field".
+   */
+  def getPartitionPathFieldWriteConfig(tableConfigKeyGeneratorClassName: 
String,
+   partitionFieldNamesWithoutKeyGenType: 
String,
+   catalogTable: HoodieCatalogTable): 
String = {
+if (StringUtils.isNullOrEmpty(tableConfigKeyGeneratorClassName)) {
+  partitionFieldNamesWithoutKeyGenType
+} else {

Review Comment:
   Flink writer should provide the correct partition field write config.  The 
query side may have some gaps.
   
   Created [HUDI-7613](https://issues.apache.org/jira/browse/HUDI-7613) as a 
follow-up.



##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala:
##
@@ -201,8 +201,26 @@ object HoodieWriterUtils {
   
diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n")
 }
 
+// Please note that the validation of partition path fields needs the 
key generator class
+// for the table, since the custom key generator expects a different 
format of
+// the value of the write config 
"hoodie.datasource.write.partitionpath.field"
+// e.g., "col:simple,ts:timestamp", whereas the table config 
"hoodie.table.partition.fields"
+// in hoodie.properties stores "col,ts".
+// The "params" here may only contain the write config of partition 
path field,
+// so we need to pass in the validated key generator class name.
+val validatedKeyGenClassName = if (tableConfigKeyGen != null) {

Review Comment:
   Only the `hoodie.datasource.write.partitionpath.field` takes effect in the 
writer path.  Before the fix, the write config is automatically set by the SQL 
writer based on the value of table config `hoodie.table.partition.fields`.



##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##
@@ -528,6 +536,40 @@ object ProvidesHoodieConfig {
   filterNullValues(overridingOpts)
   }
 
+  /**
+   * @param tableConfigKeyGeneratorClassName key generator class name in 
the table config.
+   * @param partitionFieldNamesWithoutKeyGenType partition field names without 
key generator types
+   * from the table config.
+   * @param catalogTable HoodieCatalogTable instance 
to fetch table properties.
+   * @return the write config value to set for 
"hoodie.datasource.write.partitionpath.field".
+   */
+  def getPartitionPathFieldWriteConfig(tableConfigKeyGeneratorClassName: 
String,
+   partitionFieldNamesWithoutKeyGenType: 
String,
+   catalogTable: HoodieCatalogTable): 
String = {
+if (StringUtils.isNullOrEmpty(tableConfigKeyGeneratorClassName)) {
+  partitionFieldNamesWithoutKeyGenType
+} else {
+  val writeConfigPartitionField = 
catalogTable.catalogProperties.get(PARTITIONPATH_FIELD.key())
+  val keyGenClass = 
ReflectionUtils.getClass(tableConfigKeyGeneratorClassName)
+  if (classOf[CustomKeyGenerator].equals(keyGenClass)
+|| classOf[CustomAvroKeyGenerator].equals(keyGenClass)) {
+// For custom key generator, we have to take the write config value 
from
+// "hoodie.datasource.write.partitionpath.field" which contains the 
key generator
+// type, whereas the table config only contains the prtition field 
names without
+// key generator types.
+if (writeConfigPartitionField.isDefined) {
+  writeConfigPartitionField.get
+} else {
+  log.warn("Write config 
\"hoodie.datasource.write.partitionpath.field\" is not set for "
++ "custom key generator. This may fail the write operation.")
+  partitionFieldNamesWithoutKeyGenType

Review Comment:
   It fails with the error message `Unable to find field names for partition 
path in proper format` in the `CustomKeyGenerator` indicating that the config 
is not set properly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use 

[jira] [Updated] (HUDI-7613) Check write/query with Flink and Hive on CustomKeyGenerator

2024-04-12 Thread Ethan Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo updated HUDI-7613:

Description: https://github.com/apache/hudi/pull/10615/files#r1551075779

> Check write/query with Flink and Hive on CustomKeyGenerator
> ---
>
> Key: HUDI-7613
> URL: https://issues.apache.org/jira/browse/HUDI-7613
> Project: Apache Hudi
>  Issue Type: Improvement
>Reporter: Ethan Guo
>Priority: Major
>
> https://github.com/apache/hudi/pull/10615/files#r1551075779



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7613) Check write/query with Flink and Hive on CustomKeyGenerator

2024-04-12 Thread Ethan Guo (Jira)
Ethan Guo created HUDI-7613:
---

 Summary: Check write/query with Flink and Hive on 
CustomKeyGenerator
 Key: HUDI-7613
 URL: https://issues.apache.org/jira/browse/HUDI-7613
 Project: Apache Hudi
  Issue Type: Improvement
Reporter: Ethan Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-7613) Check write/query with Flink and Hive on CustomKeyGenerator

2024-04-12 Thread Ethan Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo updated HUDI-7613:

Fix Version/s: 1.0.0

> Check write/query with Flink and Hive on CustomKeyGenerator
> ---
>
> Key: HUDI-7613
> URL: https://issues.apache.org/jira/browse/HUDI-7613
> Project: Apache Hudi
>  Issue Type: Improvement
>Reporter: Ethan Guo
>Priority: Major
> Fix For: 1.0.0
>
>
> https://github.com/apache/hudi/pull/10615/files#r1551075779



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled

2024-04-12 Thread Jonathan Vexler (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836737#comment-17836737
 ] 

Jonathan Vexler commented on HUDI-7610:
---

use hoodie is deleted where delete precombine is less than insert


{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  val merger = classOf[HoodieSparkRecordMerger].getName
  //val merger = classOf[HoodieAvroRecordMerger].getName
  val useFGReader = "true"
  //val useFGReader = "false"
  //val tableType = "COPY_ON_WRITE"
  val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "number", "_hoodie_is_deleted")
  val data = Seq((10, "A", 1, false), (10, "B", 1, false))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(TABLE_TYPE.key(), tableType).
option("hoodie.table.name", "test_table").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Overwrite).
save(basePath)

  val updateData = Seq((12, "A", 2, false),(9, "B", 2, true))

  val updates = spark.createDataFrame(updateData).toDF(columns: _*)

  updates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val updateData2 = Seq((11, "A", 3, false),(8, "B", 3, false))
  val updates2 = spark.createDataFrame(updateData2).toDF(columns: _*)
  updates2.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val df = spark.read.format("hudi").
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), 
"false").load(basePath)
  val finalDf = df.select("ts", "key", "number", "_hoodie_is_deleted")
  finalDf.show(100, false)
  finalDf.show(100, false)
} {code}
merger: Avro, useFGReader: false, tableType: cow
merger: Avro, useFGReader: true, tableType: cow
merger: Spark, useFGReader: true, tableType: cow
merger: Spark, useFGReader: false, tableType: cow
merger: Avro, useFGReader: false, tableType: mor
merger: Spark, useFGReader: false, tableType: mor
{code:java}
+---+---+--+--+
|ts |key|number|_hoodie_is_deleted|
+---+---+--+--+
|12 |A  |2 |false |
|10 |B  |1 |false |
+---+---+--+--+ {code}
 
merger: Avro, useFGReader: true, tableType: mor
merger: Spark, useFGReader: true, tableType: mor

{code:java}
+---+---+--+--+
|ts |key|number|_hoodie_is_deleted|
+---+---+--+--+
|12 |A  |2 |false |
+---+---+--+--+ {code}

> Delete records are inconsistent depending on MOR/COW, Avro/Spark record 
> merger, new filegroup reader enabled/disabled
> -
>
> Key: HUDI-7610
> URL: https://issues.apache.org/jira/browse/HUDI-7610
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: reader-core
>Reporter: Jonathan Vexler
>Priority: Blocker
> Fix For: 1.0.0
>
>
> Here is a test that can be run on master:
>  
> {code:java}
> @Test
> def showDeleteIsInconsistent(): Unit = {
>   val merger = classOf[HoodieSparkRecordMerger].getName
>   //val merger = classOf[HoodieAvroRecordMerger].getName
>   val useFGReader = "true"
>   //val useFGReader = "false"
>   //val tableType = "COPY_ON_WRITE"
>   val tableType = "MERGE_ON_READ"
>   val columns = Seq("ts", "key", "rider", "driver", "fare", "number")
>   val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7),
> (10, "2", "rider-B", "driver-B", 27.70, 1),
> (10, "3", "rider-C", "driver-C", 33.90, 10),
> (-1, "4", "rider-D", "driver-D", 34.15, 6),
> (10, "5", "rider-E", "driver-E", 17.85, 

[jira] [Comment Edited] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled

2024-04-12 Thread Jonathan Vexler (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836735#comment-17836735
 ] 

Jonathan Vexler edited comment on HUDI-7610 at 4/12/24 7:58 PM:


use hoodie is deleted:
{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  //val merger = classOf[HoodieSparkRecordMerger].getName
  val merger = classOf[HoodieAvroRecordMerger].getName
  //val useFGReader = "true"
  val useFGReader = "false"
  val tableType = "COPY_ON_WRITE"
  //val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "number", "_hoodie_is_deleted")
  val data = Seq((10, "A", 1, false), (10, "B", 1, false))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(TABLE_TYPE.key(), tableType).
option("hoodie.table.name", "test_table").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Overwrite).
save(basePath)

  val updateData = Seq((9, "A", 2, false),(11, "B", 2, true))

  val updates = spark.createDataFrame(updateData).toDF(columns: _*)

  updates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val updateData2 = Seq((11, "A", 3, false),(9, "B", 3, false))
  val updates2 = spark.createDataFrame(updateData2).toDF(columns: _*)
  updates2.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val df = spark.read.format("hudi").
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), 
"false").load(basePath)
  val finalDf = df.select("ts", "key", "number", "_hoodie_is_deleted")
  finalDf.show(100, false)
  finalDf.show(100, false)
} {code}
 
merger: Avro, useFGReader: false, tableType: cow
merger: Avro, useFGReader: true, tableType: cow
merger: Spark, useFGReader: true, tableType: cow
merger: Spark, useFGReader: false, tableType: cow
{code:java}
+---+---+--+--+
|ts |key|number|_hoodie_is_deleted|
+---+---+--+--+
|11 |A  |3 |false |
|9  |B  |3 |false |
+---+---+--+--+ {code}
 

merger: Avro, useFGReader: false, tableType: mor
merger: Spark, useFGReader: false, tableType: mor
{code:java}
+---+---+--+--+
|ts |key|number|_hoodie_is_deleted|
+---+---+--+--+
|11 |A  |3 |false |
|10 |B  |1 |false |
+---+---+--+--+ {code}
 

 

merger: Avro, useFGReader: true, tableType: mor
merger: Spark, useFGReader: true, tableType: mor
{code:java}
+---+---+--+--+
|ts |key|number|_hoodie_is_deleted|
+---+---+--+--+
|11 |A  |3 |false |
+---+---+--+--+ {code}
 


was (Author: JIRAUSER295101):
use hoodie is deleted:
{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  //val merger = classOf[HoodieSparkRecordMerger].getName
  val merger = classOf[HoodieAvroRecordMerger].getName
  //val useFGReader = "true"
  val useFGReader = "false"
  val tableType = "COPY_ON_WRITE"
  //val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "number", "_hoodie_is_deleted")
  val data = Seq((10, "A", 1, false), (10, "B", 1, false))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(TABLE_TYPE.key(), tableType).
option("hoodie.table.name", "test_table").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Overwrite).

[jira] [Commented] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled

2024-04-12 Thread Jonathan Vexler (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836735#comment-17836735
 ] 

Jonathan Vexler commented on HUDI-7610:
---

use hoodie is deleted:
{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  //val merger = classOf[HoodieSparkRecordMerger].getName
  val merger = classOf[HoodieAvroRecordMerger].getName
  //val useFGReader = "true"
  val useFGReader = "false"
  val tableType = "COPY_ON_WRITE"
  //val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "number", "_hoodie_is_deleted")
  val data = Seq((10, "A", 1, false), (10, "B", 1, false))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(TABLE_TYPE.key(), tableType).
option("hoodie.table.name", "test_table").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Overwrite).
save(basePath)

  val updateData = Seq((9, "A", 2, false),(11, "B", 2, true))

  val updates = spark.createDataFrame(updateData).toDF(columns: _*)

  updates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val updateData2 = Seq((11, "A", 3, false),(9, "B", 3, false))
  val updates2 = spark.createDataFrame(updateData2).toDF(columns: _*)
  updates2.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
mode(SaveMode.Append).
save(basePath)

  val df = spark.read.format("hudi").
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), 
"false").load(basePath)
  val finalDf = df.select("ts", "key", "number", "_hoodie_is_deleted")
  finalDf.show(100, false)
  finalDf.show(100, false)
} {code}
 
merger: Avro, useFGReader: false, tableType: cow
merger: Avro, useFGReader: true, tableType: cow
merger: Spark, useFGReader: true, tableType: cow
merger: Spark, useFGReader: false, tableType: cow

{code:java}
+---+---+--+--+
|ts |key|number|_hoodie_is_deleted|
+---+---+--+--+
|11 |A  |3 |false |
|9  |B  |3 |false |
+---+---+--+--+ {code}




merger: Avro, useFGReader: false, tableType: mor
merger: Spark, useFGReader: false, tableType: mor
{code:java}
+---+---+--+--+
|ts |key|number|_hoodie_is_deleted|
+---+---+--+--+
|11 |A  |3 |false |
|10 |B  |1 |false |
+---+---+--+--+ {code}
merger: Avro, useFGReader: true, tableType: mor
merger: Spark, useFGReader: true, tableType: mor

{code:java}
+---+---+--+--+
|ts |key|number|_hoodie_is_deleted|
+---+---+--+--+
|11 |A  |3 |false |
+---+---+--+--+ {code}








 

> Delete records are inconsistent depending on MOR/COW, Avro/Spark record 
> merger, new filegroup reader enabled/disabled
> -
>
> Key: HUDI-7610
> URL: https://issues.apache.org/jira/browse/HUDI-7610
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: reader-core
>Reporter: Jonathan Vexler
>Priority: Blocker
> Fix For: 1.0.0
>
>
> Here is a test that can be run on master:
>  
> {code:java}
> @Test
> def showDeleteIsInconsistent(): Unit = {
>   val merger = classOf[HoodieSparkRecordMerger].getName
>   //val merger = classOf[HoodieAvroRecordMerger].getName
>   val useFGReader = "true"
>   //val useFGReader = "false"
>   //val tableType = "COPY_ON_WRITE"
>   val tableType = "MERGE_ON_READ"
>   val columns = Seq("ts", "key", "rider", "driver", "fare", "number")
>   val data = Seq((10, "1", "rider-A", "driver-A", 

Re: [PR] [HUDI-7269] Fallback to key based merge if positions are missing from log block [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10991:
URL: https://github.com/apache/hudi/pull/10991#issuecomment-2052437226

   
   ## CI report:
   
   * 7dfe5ef7fa89cebfca107cd54ca9f417eff2ba3c Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23194)
 
   * 2af03c004aef66248dae6283e9c2f1e63e062e75 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23229)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [DO NOT MERGE][HUDI-7567] Add schema evolution to the filegroup reader [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10957:
URL: https://github.com/apache/hudi/pull/10957#issuecomment-2052437106

   
   ## CI report:
   
   * ee7a0e3a401dd0d2c0f2d1095256fb9f27c9802f Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23222)
 
   * 966e8c85f2afb0ffaf00e12d02eb41b41c68e0bc Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23228)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10956:
URL: https://github.com/apache/hudi/pull/10956#issuecomment-2052437070

   
   ## CI report:
   
   * 37bc97b3e080cb3664405a446c0174655720d41c Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23221)
 
   * a73f9559fc8626342b767085cf7a56f743a425fc Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23227)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Setup spark and timeline services once where possible [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #11009:
URL: https://github.com/apache/hudi/pull/11009#issuecomment-2052428638

   
   ## CI report:
   
   * 819ec8e0c3de67165bd6d54b35d5c708e28d98a0 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23223)
 
   * f6f303c7a2c89f5926d1f8dfda1d39fdd0134cba Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23226)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7269] Fallback to key based merge if positions are missing from log block [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10991:
URL: https://github.com/apache/hudi/pull/10991#issuecomment-2052428509

   
   ## CI report:
   
   * 7dfe5ef7fa89cebfca107cd54ca9f417eff2ba3c Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23194)
 
   * 2af03c004aef66248dae6283e9c2f1e63e062e75 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [DO NOT MERGE][HUDI-7567] Add schema evolution to the filegroup reader [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10957:
URL: https://github.com/apache/hudi/pull/10957#issuecomment-2052428384

   
   ## CI report:
   
   * ee7a0e3a401dd0d2c0f2d1095256fb9f27c9802f Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23222)
 
   * 966e8c85f2afb0ffaf00e12d02eb41b41c68e0bc UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10956:
URL: https://github.com/apache/hudi/pull/10956#issuecomment-2052428308

   
   ## CI report:
   
   * 37bc97b3e080cb3664405a446c0174655720d41c Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23221)
 
   * a73f9559fc8626342b767085cf7a56f743a425fc UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Setup spark and timeline services once where possible [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #11009:
URL: https://github.com/apache/hudi/pull/11009#issuecomment-2052420254

   
   ## CI report:
   
   * 819ec8e0c3de67165bd6d54b35d5c708e28d98a0 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23223)
 
   * f6f303c7a2c89f5926d1f8dfda1d39fdd0134cba UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [MINOR] Make ordering deterministic in small file selection [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #11008:
URL: https://github.com/apache/hudi/pull/11008#issuecomment-2052420166

   
   ## CI report:
   
   * baaff5d03b4199e0aa188492cfa8a5fe2908a47e Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23220)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (HUDI-7612) HoodieSparkRecordMerger does not handle deletes based on the preCombine/ordering field

2024-04-12 Thread Jonathan Vexler (Jira)
Jonathan Vexler created HUDI-7612:
-

 Summary: HoodieSparkRecordMerger does not handle deletes based on 
the preCombine/ordering field
 Key: HUDI-7612
 URL: https://issues.apache.org/jira/browse/HUDI-7612
 Project: Apache Hudi
  Issue Type: Bug
  Components: spark
Reporter: Jonathan Vexler
Assignee: Jonathan Vexler


The merger handles deletes based off of overwrite with latest. But the rest of 
the logic is like default payload



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (HUDI-7611) DELETE operation does not route preCombine/ordering field values to the delete records

2024-04-12 Thread Jonathan Vexler (Jira)
Jonathan Vexler created HUDI-7611:
-

 Summary: DELETE operation does not route preCombine/ordering field 
values to the delete records
 Key: HUDI-7611
 URL: https://issues.apache.org/jira/browse/HUDI-7611
 Project: Apache Hudi
  Issue Type: Bug
  Components: spark
Reporter: Jonathan Vexler
Assignee: Jonathan Vexler


Write client just takes in a list of keys for the delete operation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (HUDI-7611) DELETE operation does not route preCombine/ordering field values to the delete records

2024-04-12 Thread Jonathan Vexler (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Vexler updated HUDI-7611:
--
Fix Version/s: 1.0.0

> DELETE operation does not route preCombine/ordering field values to the 
> delete records
> --
>
> Key: HUDI-7611
> URL: https://issues.apache.org/jira/browse/HUDI-7611
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: spark
>Reporter: Jonathan Vexler
>Assignee: Jonathan Vexler
>Priority: Major
> Fix For: 1.0.0
>
>
> Write client just takes in a list of keys for the delete operation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (HUDI-7565) Break-up schema evolution: port spark code to file readers

2024-04-12 Thread Jonathan Vexler (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Vexler closed HUDI-7565.
-
Resolution: Fixed

> Break-up schema evolution: port spark code to file readers
> --
>
> Key: HUDI-7565
> URL: https://issues.apache.org/jira/browse/HUDI-7565
> Project: Apache Hudi
>  Issue Type: Sub-task
>Reporter: Jonathan Vexler
>Assignee: Jonathan Vexler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.0.0
>
>
> [https://github.com/apache/hudi/pull/10278] is too large to review and needs 
> to be broken into smaller prs. Create PR for just the ported spark code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled

2024-04-12 Thread Ethan Guo (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836729#comment-17836729
 ] 

Ethan Guo edited comment on HUDI-7610 at 4/12/24 7:03 PM:
--

Based on offline discussion, immediately we see two issues:

(1) DELETE operation does not route preCombine/ordering field values to the 
delete records
(2) HoodieSparkRecordMerger does not handle deletes based on the 
preCombine/ordering field (it mixes the logic of overwrite with the latest, 
i.e., commit time-based, and default hudi payload, i.e., order time-based)

Let's create separate JIRAs for the above issues.

A few more things to try to see if there are other issues:

(1) use UPSERT operation with _hoodie_is_deleted field for deletes
(2) for Avro merger, default payload class is the 
OverwriteWithLatestAvroPayload; change the payload to 
DefaultHoodieRecordPayload and check results


was (Author: guoyihua):
Based on offline discussion, immediately we see two issues:

(1) DELETE operation does not route preCombine/ordering field values to the 
delete records
(2) HoodieSparkRecordMerger does not handle deletes based on the 
preCombine/ordering field (it mixes the logic of overwrite with the latest, 
i.e., commit time-based, and default hudi payload, i.e., order time-based)

A few more things to try to see if there are other issues:

(1) use UPSERT operation with _hoodie_is_deleted field for deletes
(2) for Avro merger, default payload class is the 
OverwriteWithLatestAvroPayload; change the payload to 
DefaultHoodieRecordPayload and check results

> Delete records are inconsistent depending on MOR/COW, Avro/Spark record 
> merger, new filegroup reader enabled/disabled
> -
>
> Key: HUDI-7610
> URL: https://issues.apache.org/jira/browse/HUDI-7610
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: reader-core
>Reporter: Jonathan Vexler
>Priority: Blocker
> Fix For: 1.0.0
>
>
> Here is a test that can be run on master:
>  
> {code:java}
> @Test
> def showDeleteIsInconsistent(): Unit = {
>   val merger = classOf[HoodieSparkRecordMerger].getName
>   //val merger = classOf[HoodieAvroRecordMerger].getName
>   val useFGReader = "true"
>   //val useFGReader = "false"
>   //val tableType = "COPY_ON_WRITE"
>   val tableType = "MERGE_ON_READ"
>   val columns = Seq("ts", "key", "rider", "driver", "fare", "number")
>   val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7),
> (10, "2", "rider-B", "driver-B", 27.70, 1),
> (10, "3", "rider-C", "driver-C", 33.90, 10),
> (-1, "4", "rider-D", "driver-D", 34.15, 6),
> (10, "5", "rider-E", "driver-E", 17.85, 10))
>   val inserts = spark.createDataFrame(data).toDF(columns: _*)
>   inserts.write.format("hudi").
> option(RECORDKEY_FIELD.key(), "key").
> option(PRECOMBINE_FIELD.key(), "ts").
> option(TABLE_TYPE.key(), tableType).
> option("hoodie.table.name", "test_table").
> option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
> option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
> option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
> mode(SaveMode.Overwrite).
> save(basePath)
>   val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9),
> (9, "2", "rider-Y", "driver-Y", 27.70, 7))
>   val updates = spark.createDataFrame(updateData).toDF(columns: _*)
>   updates.write.format("hudi").
> option(RECORDKEY_FIELD.key(), "key").
> option(PRECOMBINE_FIELD.key(), "ts").
> option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
> option(TABLE_TYPE.key(), tableType).
> option(OPERATION.key(), "upsert").
> option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
> option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
> option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
> mode(SaveMode.Append).
> save(basePath)
>   val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6))
>   val deletes = spark.createDataFrame(deletesData).toDF(columns: _*)
>   deletes.write.format("hudi").
> option(RECORDKEY_FIELD.key(), "key").
> option(PRECOMBINE_FIELD.key(), "ts").
> option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
> option(TABLE_TYPE.key(), tableType).
> option(OPERATION.key(), "delete").
> option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
> option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
> option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
> mode(SaveMode.Append).
> save(basePath)
>   val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), 
> (-10, "4", "rider-DD", "driver-DD", 34.15, 5))
>   val 

[jira] [Commented] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled

2024-04-12 Thread Ethan Guo (Jira)


[ 
https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836729#comment-17836729
 ] 

Ethan Guo commented on HUDI-7610:
-

Based on offline discussion, immediately we see two issues:

(1) DELETE operation does not route preCombine/ordering field values to the 
delete records
(2) HoodieSparkRecordMerger does not handle deletes based on the 
preCombine/ordering field (it mixes the logic of overwrite with the latest, 
i.e., commit time-based, and default hudi payload, i.e., order time-based)

A few more things to try to see if there are other issues:

(1) use UPSERT operation with _hoodie_is_deleted field for deletes
(2) for Avro merger, default payload class is the 
OverwriteWithLatestAvroPayload; change the payload to 
DefaultHoodieRecordPayload and check results

> Delete records are inconsistent depending on MOR/COW, Avro/Spark record 
> merger, new filegroup reader enabled/disabled
> -
>
> Key: HUDI-7610
> URL: https://issues.apache.org/jira/browse/HUDI-7610
> Project: Apache Hudi
>  Issue Type: Bug
>  Components: reader-core
>Reporter: Jonathan Vexler
>Priority: Blocker
> Fix For: 1.0.0
>
>
> Here is a test that can be run on master:
>  
> {code:java}
> @Test
> def showDeleteIsInconsistent(): Unit = {
>   val merger = classOf[HoodieSparkRecordMerger].getName
>   //val merger = classOf[HoodieAvroRecordMerger].getName
>   val useFGReader = "true"
>   //val useFGReader = "false"
>   //val tableType = "COPY_ON_WRITE"
>   val tableType = "MERGE_ON_READ"
>   val columns = Seq("ts", "key", "rider", "driver", "fare", "number")
>   val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7),
> (10, "2", "rider-B", "driver-B", 27.70, 1),
> (10, "3", "rider-C", "driver-C", 33.90, 10),
> (-1, "4", "rider-D", "driver-D", 34.15, 6),
> (10, "5", "rider-E", "driver-E", 17.85, 10))
>   val inserts = spark.createDataFrame(data).toDF(columns: _*)
>   inserts.write.format("hudi").
> option(RECORDKEY_FIELD.key(), "key").
> option(PRECOMBINE_FIELD.key(), "ts").
> option(TABLE_TYPE.key(), tableType).
> option("hoodie.table.name", "test_table").
> option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
> option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
> option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
> mode(SaveMode.Overwrite).
> save(basePath)
>   val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9),
> (9, "2", "rider-Y", "driver-Y", 27.70, 7))
>   val updates = spark.createDataFrame(updateData).toDF(columns: _*)
>   updates.write.format("hudi").
> option(RECORDKEY_FIELD.key(), "key").
> option(PRECOMBINE_FIELD.key(), "ts").
> option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
> option(TABLE_TYPE.key(), tableType).
> option(OPERATION.key(), "upsert").
> option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
> option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
> option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
> mode(SaveMode.Append).
> save(basePath)
>   val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6))
>   val deletes = spark.createDataFrame(deletesData).toDF(columns: _*)
>   deletes.write.format("hudi").
> option(RECORDKEY_FIELD.key(), "key").
> option(PRECOMBINE_FIELD.key(), "ts").
> option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
> option(TABLE_TYPE.key(), tableType).
> option(OPERATION.key(), "delete").
> option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
> option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
> option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
> mode(SaveMode.Append).
> save(basePath)
>   val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), 
> (-10, "4", "rider-DD", "driver-DD", 34.15, 5))
>   val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: 
> _*)
>   secondUpdates.write.format("hudi").
> option(RECORDKEY_FIELD.key(), "key").
> option(PRECOMBINE_FIELD.key(), "ts").
> option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
> option(TABLE_TYPE.key(), tableType).
> option(OPERATION.key(), "upsert").
> option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
> option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
> option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
> mode(SaveMode.Append).
> save(basePath)
>   val df = spark.read.format("hudi").
> option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
> 

Re: [PR] [MINOR] Make ordering deterministic in small file selection [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #11008:
URL: https://github.com/apache/hudi/pull/11008#issuecomment-2052326296

   
   ## CI report:
   
   * baaff5d03b4199e0aa188492cfa8a5fe2908a47e Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23220)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [MINOR] Make ordering deterministic in small file selection [hudi]

2024-04-12 Thread via GitHub


the-other-tim-brown commented on PR #11008:
URL: https://github.com/apache/hudi/pull/11008#issuecomment-2052311371

   @hudi-bot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7378] Fix Spark SQL DML with custom key generator [hudi]

2024-04-12 Thread via GitHub


yihua commented on PR #10615:
URL: https://github.com/apache/hudi/pull/10615#issuecomment-2052277245

   > I like that this has the benefit of not breaking tables with their 
existing hoodie.table.recordkey.fields, but I am curious about any other 
approaches you thought about. From you test code, it looks like we can't use 
`partitioned by (dt:int,idk:string)` when creating the table. I don't think 
that should block this pr from landing, but in the documentation for SQL: 
https://hudi.apache.org/docs/sql_ddl#create-partitioned-table I think we should 
add an example
   
   Good point.  I tried `partitioned by` statement but it did not work either, 
due to the same the write config of the partition fields.  But you're right 
that adding a new table config indicating the partition field types should 
solve the problem fundamentally.  We should update the SQL docs on any gaps 
here.
   
   > 
   > Also, I think think this change will help us to fix partition pruning 
which currently does not work with timestamp keygen: 
https://issues.apache.org/jira/browse/HUDI-6614
   
   Right.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [SUPPORT] Questions about LOG in Hudi source code [hudi]

2024-04-12 Thread via GitHub


Gatsby-Lee commented on issue #10903:
URL: https://github.com/apache/hudi/issues/10903#issuecomment-2052276305

   @danny0405
   Thank you for your response.
   Can you share what to be added into the log4j2 config to print the Hudi log 
into AWS EMR log?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7599] add bootstrap mor legacy reader back to default source [hudi]

2024-04-12 Thread via GitHub


yihua merged PR #10990:
URL: https://github.com/apache/hudi/pull/10990


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(hudi) branch master updated: [HUDI-7599] add bootstrap mor legacy reader back to default source (#10990)

2024-04-12 Thread yihua
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
 new 56aded81287 [HUDI-7599] add bootstrap mor legacy reader back to 
default source (#10990)
56aded81287 is described below

commit 56aded81287f295cfee692f16be0adc6f175902e
Author: Jon Vexler 
AuthorDate: Fri Apr 12 14:31:10 2024 -0400

[HUDI-7599] add bootstrap mor legacy reader back to default source (#10990)

Co-authored-by: Jonathan Vexler <=>
---
 .../src/main/scala/org/apache/hudi/DefaultSource.scala   | 12 ++--
 .../hudi/functional/TestNewHoodieParquetFileFormat.java  | 12 +++-
 2 files changed, 13 insertions(+), 11 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index be3d2f4ed4b..8efa8e28867 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -299,14 +299,22 @@ object DefaultSource {
   new IncrementalRelation(sqlContext, parameters, userSchema, 
metaClient)
 }
 
-  case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, _) =>
+  case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
 if (useNewParquetFileFormat) {
   new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
-sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
+sqlContext, metaClient, parameters, userSchema, isBootstrap = 
false).build()
 } else {
   new MergeOnReadSnapshotRelation(sqlContext, parameters, 
metaClient, globPaths, userSchema)
 }
 
+  case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) =>
+if (useNewParquetFileFormat) {
+  new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
+sqlContext, metaClient, parameters, userSchema, isBootstrap = 
true).build()
+} else {
+  HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths, 
metaClient, parameters)
+}
+
   case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
 if (useNewParquetFileFormat) {
   new HoodieMergeOnReadIncrementalHadoopFsRelationFactory(
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
index ce462c93d1b..be2b6ff949e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SaveMode;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -39,18 +38,13 @@ import static 
org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @Tag("functional")
-@Disabled("HUDI-6756")
 public class TestNewHoodieParquetFileFormat extends TestBootstrapReadBase {
 
   private static Stream testArgs() {
 Stream.Builder b = Stream.builder();
-HoodieTableType[] tableType = {COPY_ON_WRITE, MERGE_ON_READ};
-Integer[] nPartitions = {0, 1, 2};
-for (HoodieTableType tt : tableType) {
-  for (Integer n : nPartitions) {
-b.add(Arguments.of(tt, n));
-  }
-}
+b.add(Arguments.of(MERGE_ON_READ, 0));
+b.add(Arguments.of(COPY_ON_WRITE, 1));
+b.add(Arguments.of(MERGE_ON_READ, 2));
 return b.build();
   }
 



Re: [PR] [DO NOT MERGE][HUDI-7567] Add schema evolution to the filegroup reader [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10957:
URL: https://github.com/apache/hudi/pull/10957#issuecomment-2052198778

   
   ## CI report:
   
   * ee7a0e3a401dd0d2c0f2d1095256fb9f27c9802f Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23222)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7604] Make table name config work properly [hudi]

2024-04-12 Thread via GitHub


jonvex commented on code in PR #10998:
URL: https://github.com/apache/hudi/pull/10998#discussion_r1562940754


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##
@@ -964,6 +964,11 @@ object DataSourceOptionsHelper {
 
   def translateConfigurations(optParams: Map[String, String]): Map[String, 
String] = {
 val translatedOpt = scala.collection.mutable.Map[String, String]() ++= 
optParams
+if (!translatedOpt.contains(HoodieTableConfig.NAME.key()) &&

Review Comment:
   I don't think this makes things worse than they already are with spark 
configs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [DO NOT MERGE][HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10956:
URL: https://github.com/apache/hudi/pull/10956#issuecomment-2052198701

   
   ## CI report:
   
   * 37bc97b3e080cb3664405a446c0174655720d41c Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23221)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled

2024-04-12 Thread Jonathan Vexler (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Vexler updated HUDI-7610:
--
Description: 
Here is a test that can be run on master:

 
{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  val merger = classOf[HoodieSparkRecordMerger].getName
  //val merger = classOf[HoodieAvroRecordMerger].getName
  val useFGReader = "true"
  //val useFGReader = "false"
  //val tableType = "COPY_ON_WRITE"
  val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "rider", "driver", "fare", "number")
  val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7),
(10, "2", "rider-B", "driver-B", 27.70, 1),
(10, "3", "rider-C", "driver-C", 33.90, 10),
(-1, "4", "rider-D", "driver-D", 34.15, 6),
(10, "5", "rider-E", "driver-E", 17.85, 10))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(TABLE_TYPE.key(), tableType).
option("hoodie.table.name", "test_table").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Overwrite).
save(basePath)

  val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9),
(9, "2", "rider-Y", "driver-Y", 27.70, 7))

  val updates = spark.createDataFrame(updateData).toDF(columns: _*)

  updates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)

  val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6))

  val deletes = spark.createDataFrame(deletesData).toDF(columns: _*)
  deletes.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "delete").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)


  val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), (-10, 
"4", "rider-DD", "driver-DD", 34.15, 5))
  val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: _*)
  secondUpdates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)

  val df = spark.read.format("hudi").
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), 
"false").load(basePath)
  val finalDf = df.select("ts", "key", "rider", "driver", "fare", "number")
  finalDf.show(100,false)
}{code}
 

There are 4 different outcomes:

 

 

 

merger: Avro, useFGReader: false, tableType: cow
merger: Avro, useFGReader: true, tableType: cow
merger: Spark, useFGReader: false, tableType: cow
merger: Spark, useFGReader: true, tableType: cow
{code:java}
+---+---++-+-+--+
|ts |key|rider   |driver   |fare |number|
+---+---++-+-+--+
|11 |1  |rider-X |driver-X |19.1 |9 |
|14 |5  |rider-Z |driver-Z |17.85|3 |
|10 |3  |rider-C |driver-C |33.9 |10|
|10 |2  |rider-B |driver-B |27.7 |1 |
|-10|4  |rider-DD|driver-DD|34.15|5 |
+---+---++-+-+--+{code}
 

 

 
merger: Avro, useFGReader: false, tableType: mor
{code:java}
+---+---+---++-+--+
|ts |key|rider  |driver  |fare |number|
+---+---+---++-+--+
|11 |1  |rider-X|driver-X|19.1 |9 |
|14 |5  |rider-Z|driver-Z|17.85|3 |
|-1 |4  |rider-D|driver-D|34.15|6 |
|10 |3  |rider-C|driver-C|33.9 |10|
|10 |2  |rider-B|driver-B|27.7 |1 |
+---+---+---++-+--+ {code}
 

 

 

merger: Avro, useFGReader: true, tableType: mor
{code:java}

Re: [PR] [HUDI-7604] Make table name config work properly [hudi]

2024-04-12 Thread via GitHub


yihua commented on code in PR #10998:
URL: https://github.com/apache/hudi/pull/10998#discussion_r1562929292


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##
@@ -964,6 +964,11 @@ object DataSourceOptionsHelper {
 
   def translateConfigurations(optParams: Map[String, String]): Map[String, 
String] = {
 val translatedOpt = scala.collection.mutable.Map[String, String]() ++= 
optParams
+if (!translatedOpt.contains(HoodieTableConfig.NAME.key()) &&

Review Comment:
   Understood.  Should we deprioritize this for now and look for a better way?  
I think it's better to have such logic in a common place instead of being 
scattered in a separate method, which is going to be harder to maintain.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(hudi) branch master updated: [HUDI-7565] Create spark file readers to read a single file instead of an entire partition (#10954)

2024-04-12 Thread yihua
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
 new f715e8a02e8 [HUDI-7565] Create spark file readers to read a single 
file instead of an entire partition (#10954)
f715e8a02e8 is described below

commit f715e8a02e8ee5561274ad38bdda5e863317b240
Author: Jon Vexler 
AuthorDate: Fri Apr 12 13:29:12 2024 -0400

[HUDI-7565] Create spark file readers to read a single file instead of an 
entire partition (#10954)

Co-authored-by: Jonathan Vexler <=>
---
 .../datasources/parquet/SparkParquetReader.scala   |  44 
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |  18 +-
 .../parquet/SparkParquetReaderBase.scala   |  96 +++
 .../parquet/TestSparkParquetReaderFormat.scala |  56 
 .../hudi/functional/TestSparkParquetReader.java|  48 
 .../org/apache/hudi/util/JavaConversions.scala |  22 +-
 .../apache/spark/sql/adapter/Spark2Adapter.scala   |  20 +-
 .../datasources/parquet/Spark24ParquetReader.scala | 225 
 .../apache/spark/sql/adapter/Spark3_0Adapter.scala |  20 +-
 .../datasources/parquet/Spark30ParquetReader.scala | 229 +
 .../apache/spark/sql/adapter/Spark3_1Adapter.scala |  19 +-
 .../datasources/parquet/Spark31ParquetReader.scala | 242 ++
 .../apache/spark/sql/adapter/Spark3_2Adapter.scala |  20 +-
 .../datasources/parquet/Spark32ParquetReader.scala | 267 +++
 .../apache/spark/sql/adapter/Spark3_3Adapter.scala |  20 +-
 .../datasources/parquet/Spark33ParquetReader.scala | 268 +++
 .../apache/spark/sql/adapter/Spark3_4Adapter.scala |  20 +-
 .../datasources/parquet/Spark34ParquetReader.scala | 277 
 .../apache/spark/sql/adapter/Spark3_5Adapter.scala |  20 +-
 .../datasources/parquet/Spark35ParquetReader.scala | 284 +
 20 files changed, 2206 insertions(+), 9 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReader.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReader.scala
new file mode 100644
index 000..920e4cb0e0b
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReader.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+trait SparkParquetReader extends Serializable {
+  /**
+   * Read an individual parquet file
+   *
+   * @param fileparquet file to read
+   * @param requiredSchema  desired output schema of the data
+   * @param partitionSchema schema of the partition columns. Partition values 
will be appended to the end of every row
+   * @param filters filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
+   * @param sharedConf  the hadoop conf
+   * @return iterator of rows read from the file output type says 
[[InternalRow]] but could be [[ColumnarBatch]]
+   */
+  def read(file: PartitionedFile,
+   requiredSchema: StructType,
+   partitionSchema: StructType,
+   filters: Seq[Filter],
+   sharedConf: Configuration): Iterator[InternalRow]
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index 1c6111afe47..91fe6dabc2e 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -19,6 +19,7 @@
 package 

Re: [PR] [HUDI-7565] Create spark file readers to read a single file instead of an entire partition [hudi]

2024-04-12 Thread via GitHub


yihua merged PR #10954:
URL: https://github.com/apache/hudi/pull/10954


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7565] Create spark file readers to read a single file instead of an entire partition [hudi]

2024-04-12 Thread via GitHub


yihua commented on code in PR #10954:
URL: https://github.com/apache/hudi/pull/10954#discussion_r1562924017


##
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkHoodieParquetReader.java:
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional;
+
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.util.JavaConversions;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("functional")
+public class TestSparkHoodieParquetReader extends TestBootstrapReadBase {

Review Comment:
   @jonvex could you create a follow-up ticket to move the utils in 
`TestBootstrapReadBase` to a common util class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled

2024-04-12 Thread Jonathan Vexler (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Vexler updated HUDI-7610:
--
Description: 
Here is a test that can be run on master:

 
{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  val merger = classOf[HoodieSparkRecordMerger].getName
  //val merger = classOf[HoodieAvroRecordMerger].getName
  val useFGReader = "true"
  //val useFGReader = "false"
  //val tableType = "COPY_ON_WRITE"
  val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "rider", "driver", "fare", "number")
  val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7),
(10, "2", "rider-B", "driver-B", 27.70, 1),
(10, "3", "rider-C", "driver-C", 33.90, 10),
(-1, "4", "rider-D", "driver-D", 34.15, 6),
(10, "5", "rider-E", "driver-E", 17.85, 10))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(TABLE_TYPE.key(), tableType).
option("hoodie.table.name", "test_table").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Overwrite).
save(basePath)

  val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9),
(9, "2", "rider-Y", "driver-Y", 27.70, 7))

  val updates = spark.createDataFrame(updateData).toDF(columns: _*)

  updates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)

  val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6))

  val deletes = spark.createDataFrame(deletesData).toDF(columns: _*)
  deletes.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "delete").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)


  val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), (-10, 
"4", "rider-DD", "driver-DD", 34.15, 5))
  val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: _*)
  secondUpdates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)

  val df = spark.read.format("hudi").
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), 
"false").load(basePath)
  val finalDf = df.select("ts", "key", "rider", "driver", "fare", "number")
  finalDf.show(100,false)
}{code}
 

There are 4 different outcomes:

 

 

 

merger: Avro, useFGReader: false, tableType: cow
merger: Avro, useFGReader: true, tableType: cow
merger: Spark, useFGReader: false, tableType: cow
merger: Spark, useFGReader: true, tableType: cow
{code:java}
+---+---++-+-+--+
|ts |key|rider   |driver   |fare |number|
+---+---++-+-+--+
|11 |1  |rider-X |driver-X |19.1 |9 |
|14 |5  |rider-Z |driver-Z |17.85|3 |
|10 |3  |rider-C |driver-C |33.9 |10|
|10 |2  |rider-B |driver-B |27.7 |1 |
|-10|4  |rider-DD|driver-DD|34.15|5 |
+---+---++-+-+--+{code}
 

 

 
merger: Avro, useFGReader: false, tableType: mor
{code:java}
+---+---+---++-+--+
|ts |key|rider  |driver  |fare |number|
+---+---+---++-+--+
|11 |1  |rider-X|driver-X|19.1 |9 |
|14 |5  |rider-Z|driver-Z|17.85|3 |
|-1 |4  |rider-D|driver-D|34.15|6 |
|10 |3  |rider-C|driver-C|33.9 |10|
|10 |2  |rider-B|driver-B|27.7 |1 |
+---+---+---++-+--+ {code}
 

 

 

merger: Avro, useFGReader: true, tableType: mor
{code:java}

Re: [PR] Setup spark and timeline services once where possible [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #11009:
URL: https://github.com/apache/hudi/pull/11009#issuecomment-2052115383

   
   ## CI report:
   
   * 819ec8e0c3de67165bd6d54b35d5c708e28d98a0 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23223)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Setup spark and timeline services once where possible [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #11009:
URL: https://github.com/apache/hudi/pull/11009#issuecomment-2052104271

   
   ## CI report:
   
   * 819ec8e0c3de67165bd6d54b35d5c708e28d98a0 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [MINOR] Make ordering deterministic in small file selection [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #11008:
URL: https://github.com/apache/hudi/pull/11008#issuecomment-2052104198

   
   ## CI report:
   
   * baaff5d03b4199e0aa188492cfa8a5fe2908a47e Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23220)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled

2024-04-12 Thread Jonathan Vexler (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Vexler updated HUDI-7610:
--
Description: 
Here is a test that can be run on master:

 
{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  val merger = classOf[HoodieSparkRecordMerger].getName
  //val merger = classOf[HoodieAvroRecordMerger].getName
  val useFGReader = "true"
  //val useFGReader = "false"
  //val tableType = "COPY_ON_WRITE"
  val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "rider", "driver", "fare", "number")
  val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7),
(10, "2", "rider-B", "driver-B", 27.70, 1),
(10, "3", "rider-C", "driver-C", 33.90, 10),
(-1, "4", "rider-D", "driver-D", 34.15, 6),
(10, "5", "rider-E", "driver-E", 17.85, 10))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(TABLE_TYPE.key(), tableType).
option("hoodie.table.name", "test_table").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Overwrite).
save(basePath)

  val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9),
(9, "2", "rider-Y", "driver-Y", 27.70, 7))

  val updates = spark.createDataFrame(updateData).toDF(columns: _*)

  updates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)

  val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6))

  val deletes = spark.createDataFrame(deletesData).toDF(columns: _*)
  deletes.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "delete").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)


  val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), (-10, 
"4", "rider-DD", "driver-DD", 34.15, 5))
  val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: _*)
  secondUpdates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)

  val df = spark.read.format("hudi").
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), 
"false").load(basePath)
  val finalDf = df.select("ts", "key", "rider", "driver", "fare", "number")
  finalDf.show(100,false)
}{code}
 

There are 4 different outcomes:

 

 

 

merger: Avro, useFGReader: false, tableType: cow
merger: Avro, useFGReader: true, tableType: cow
merger: Spark, useFGReader: false, tableType: cow
merger: Spark, useFGReader: true, tableType: cow
{code:java}
+---+---++-+-+--+
|ts |key|rider   |driver   |fare |number|
+---+---++-+-+--+
|11 |1  |rider-X |driver-X |19.1 |9 |
|14 |5  |rider-Z |driver-Z |17.85|3 |
|10 |3  |rider-C |driver-C |33.9 |10|
|10 |2  |rider-B |driver-B |27.7 |1 |
|-10|4  |rider-DD|driver-DD|34.15|5 |
+---+---++-+-+--+{code}
 

 

 
merger: Avro, useFGReader: false, tableType: mor
{code:java}
+---+---+---++-+--+
|ts |key|rider  |driver  |fare |number|
+---+---+---++-+--+
|11 |1  |rider-X|driver-X|19.1 |9 |
|14 |5  |rider-Z|driver-Z|17.85|3 |
|-1 |4  |rider-D|driver-D|34.15|6 |
|10 |3  |rider-C|driver-C|33.9 |10|
|10 |2  |rider-B|driver-B|27.7 |1 |
+---+---+---++-+--+ {code}
 

 

 

merger: Avro, useFGReader: true, tableType: mor
{code:java}

[jira] [Updated] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled

2024-04-12 Thread Jonathan Vexler (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Vexler updated HUDI-7610:
--
Description: 
Here is a test that can be run on master:

 
{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  val merger = classOf[HoodieSparkRecordMerger].getName
  //val merger = classOf[HoodieAvroRecordMerger].getName
  val useFGReader = "true"
  //val useFGReader = "false"
  //val tableType = "COPY_ON_WRITE"
  val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "rider", "driver", "fare", "number")
  val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7),
(10, "2", "rider-B", "driver-B", 27.70, 1),
(10, "3", "rider-C", "driver-C", 33.90, 10),
(-1, "4", "rider-D", "driver-D", 34.15, 6),
(10, "5", "rider-E", "driver-E", 17.85, 10))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(TABLE_TYPE.key(), tableType).
option("hoodie.table.name", "test_table").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Overwrite).
save(basePath)

  val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9),
(9, "2", "rider-Y", "driver-Y", 27.70, 7))

  val updates = spark.createDataFrame(updateData).toDF(columns: _*)

  updates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)

  val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6))

  val deletes = spark.createDataFrame(deletesData).toDF(columns: _*)
  deletes.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "delete").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)


  val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), (-10, 
"4", "rider-DD", "driver-DD", 34.15, 5))
  val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: _*)
  secondUpdates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)

  val df = spark.read.format("hudi").
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), 
"false").load(basePath)
  val finalDf = df.select("ts", "key", "rider", "driver", "fare", "number")
  finalDf.show(100,false)
}{code}
 

There are 4 different outcomes:

 

 

 

merger: Avro, useFGReader: false, tableType: cow
merger: Avro, useFGReader: true, tableType: cow
merger: Spark, useFGReader: false, tableType: cow
merger: Spark, useFGReader: true, tableType: cow
{code:java}
+---+---++-+-+--+
|ts |key|rider   |driver   |fare |number|
+---+---++-+-+--+
|11 |1  |rider-X |driver-X |19.1 |9 |
|14 |5  |rider-Z |driver-Z |17.85|3 |
|10 |3  |rider-C |driver-C |33.9 |10|
|10 |2  |rider-B |driver-B |27.7 |1 |
|-10|4  |rider-DD|driver-DD|34.15|5 |
+---+---++-+-+--+{code}
 

 

 
merger: Avro, useFGReader: false, tableType: mor
{code:java}
+---+---+---++-+--+
|ts |key|rider  |driver  |fare |number|
+---+---+---++-+--+
|11 |1  |rider-X|driver-X|19.1 |9 |
|14 |5  |rider-Z|driver-Z|17.85|3 |
|-1 |4  |rider-D|driver-D|34.15|6 |
|10 |3  |rider-C|driver-C|33.9 |10|
|10 |2  |rider-B|driver-B|27.7 |1 |
+---+---+---++-+--+ {code}
 

 

 

merger: Avro, useFGReader: true, tableType: mor
{code:java}

[jira] [Created] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled

2024-04-12 Thread Jonathan Vexler (Jira)
Jonathan Vexler created HUDI-7610:
-

 Summary: Delete records are inconsistent depending on MOR/COW, 
Avro/Spark record merger, new filegroup reader enabled/disabled
 Key: HUDI-7610
 URL: https://issues.apache.org/jira/browse/HUDI-7610
 Project: Apache Hudi
  Issue Type: Bug
  Components: reader-core
Reporter: Jonathan Vexler
 Fix For: 1.0.0


Here is a test that can be run on master:

 
{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  val merger = classOf[HoodieSparkRecordMerger].getName
  //val merger = classOf[HoodieAvroRecordMerger].getName
  val useFGReader = "true"
  //val useFGReader = "false"
  //val tableType = "COPY_ON_WRITE"
  val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "rider", "driver", "fare", "number")
  val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7),
(10, "2", "rider-B", "driver-B", 27.70, 1),
(10, "3", "rider-C", "driver-C", 33.90, 10),
(-1, "4", "rider-D", "driver-D", 34.15, 6),
(10, "5", "rider-E", "driver-E", 17.85, 10))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(TABLE_TYPE.key(), tableType).
option("hoodie.table.name", "test_table").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Overwrite).
save(basePath)

  val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9),
(9, "2", "rider-Y", "driver-Y", 27.70, 7))

  val updates = spark.createDataFrame(updateData).toDF(columns: _*)

  updates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)

  val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6))

  val deletes = spark.createDataFrame(deletesData).toDF(columns: _*)
  deletes.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "delete").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)


  val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), (-10, 
"4", "rider-DD", "driver-DD", 34.15, 5))
  val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: _*)
  secondUpdates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)

  val df = spark.read.format("hudi").
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), 
"false").load(basePath)
  val finalDf = df.select("ts", "key", "rider", "driver", "fare", "number")
  finalDf.show(100,false)
}{code}
 

There are 4 different outcomes:


merger: Avro, useFGReader: false, tableType: cow
merger: Avro, useFGReader: true, tableType: cow
merger: Spark, useFGReader: false, tableType: cow
merger: Spark, useFGReader: true, tableType: cow

{code:java}
+---+---++-+-+--+
|ts |key|rider   |driver   |fare |number|
+---+---++-+-+--+
|11 |1  |rider-X |driver-X |19.1 |9 |
|14 |5  |rider-Z |driver-Z |17.85|3 |
|10 |3  |rider-C |driver-C |33.9 |10|
|10 |2  |rider-B |driver-B |27.7 |1 |
|-10|4  |rider-DD|driver-DD|34.15|5 |
+---+---++-+-+--+{code}
 

 

 
merger: Avro, useFGReader: false, tableType: mor
{code:java}
+---+---+---++-+--+
|ts |key|rider  |driver  |fare |number|
+---+---+---++-+--+
|11 |1  |rider-X|driver-X|19.1 |9 |
|14 |5  |rider-Z|driver-Z|17.85|3 |
|-1 |4  

[PR] Setup spark and timeline services once where possible [hudi]

2024-04-12 Thread via GitHub


the-other-tim-brown opened a new pull request, #11009:
URL: https://github.com/apache/hudi/pull/11009

   ### Change Logs
   
   _Describe context and summary for this change. Highlight if any code was 
copied._
   
   ### Impact
   
   _Describe any public API or user-facing feature change or any performance 
impact._
   
   ### Risk level (write none, low medium or high below)
   
   _If medium or high, explain what verification was done to mitigate the 
risks._
   
   ### Documentation Update
   
   _Describe any necessary documentation update if there is any new feature, 
config, or user-facing change. If not, put "none"._
   
   - _The config description must be updated if new configs are added or the 
default value of the configs are changed_
   - _Any new feature or user-facing change requires updating the Hudi website. 
Please create a Jira ticket, attach the
 ticket number here and follow the 
[instruction](https://hudi.apache.org/contribute/developer-setup#website) to 
make
 changes to the website._
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's 
guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Change Logs and Impact were stated clearly
   - [ ] Adequate tests were added if applicable
   - [ ] CI passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [DO NOT MERGE][HUDI-7567] Add schema evolution to the filegroup reader [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10957:
URL: https://github.com/apache/hudi/pull/10957#issuecomment-2052016581

   
   ## CI report:
   
   * 15acc2e870fb880a56de561be9abb72f28fa588d Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23179)
 
   * ee7a0e3a401dd0d2c0f2d1095256fb9f27c9802f Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23222)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7565] Create spark file readers to read a single file instead of an entire partition [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10954:
URL: https://github.com/apache/hudi/pull/10954#issuecomment-2052016503

   
   ## CI report:
   
   * 8f1ba6d46d8777f39c522d8bcac545ba3d4fd544 Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23211)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [DO NOT MERGE][HUDI-7567] Add schema evolution to the filegroup reader [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10957:
URL: https://github.com/apache/hudi/pull/10957#issuecomment-2052005116

   
   ## CI report:
   
   * 15acc2e870fb880a56de561be9abb72f28fa588d Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23179)
 
   * ee7a0e3a401dd0d2c0f2d1095256fb9f27c9802f UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [DO NOT MERGE][HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10956:
URL: https://github.com/apache/hudi/pull/10956#issuecomment-2051927813

   
   ## CI report:
   
   * 088f69ed54db32d1686caa4f457f6fc9aed0 Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23131)
 
   * 37bc97b3e080cb3664405a446c0174655720d41c Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23221)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [MINOR] Make ordering deterministic in small file selection [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #11008:
URL: https://github.com/apache/hudi/pull/11008#issuecomment-2051912011

   
   ## CI report:
   
   * e7dde68f9c2bda3e1045d3bcda6c2472072395a0 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23218)
 
   * baaff5d03b4199e0aa188492cfa8a5fe2908a47e Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23220)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [DO NOT MERGE][HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10956:
URL: https://github.com/apache/hudi/pull/10956#issuecomment-2051911694

   
   ## CI report:
   
   * 088f69ed54db32d1686caa4f457f6fc9aed0 Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23131)
 
   * 37bc97b3e080cb3664405a446c0174655720d41c UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [MINOR] Make ordering deterministic in small file selection [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #11008:
URL: https://github.com/apache/hudi/pull/11008#issuecomment-2051898070

   
   ## CI report:
   
   * e7dde68f9c2bda3e1045d3bcda6c2472072395a0 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23218)
 
   * baaff5d03b4199e0aa188492cfa8a5fe2908a47e UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7565] Create spark file readers to read a single file instead of an entire partition [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10954:
URL: https://github.com/apache/hudi/pull/10954#issuecomment-2051897716

   
   ## CI report:
   
   * 8f1ba6d46d8777f39c522d8bcac545ba3d4fd544 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23211)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >