[GitHub] [hudi] yihua commented on a diff in pull request #8885: [HUDI-6198] Support Hudi on Spark 3.4.0

2023-06-09 Thread via GitHub


yihua commented on code in PR #8885:
URL: https://github.com/apache/hudi/pull/8885#discussion_r1225129097


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala:
##
@@ -34,6 +34,15 @@ class HoodieParquetFileFormat extends ParquetFileFormat with 
SparkAdapterSupport
 
   override def toString: String = "Hoodie-Parquet"
 
+  override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {

Review Comment:
   JIRA to track: HUDI-6347



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] yihua commented on a diff in pull request #8885: [HUDI-6198] Support Hudi on Spark 3.4.0

2023-06-09 Thread via GitHub


yihua commented on code in PR #8885:
URL: https://github.com/apache/hudi/pull/8885#discussion_r1225128128


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##
@@ -66,17 +66,21 @@ case class BaseFileOnlyRelation(override val sqlContext: 
SQLContext,
   // NOTE: This override has to mirror semantic of whenever this Relation is 
converted into [[HadoopFsRelation]],
   //   which is currently done for all cases, except when Schema Evolution 
is enabled
   override protected val shouldExtractPartitionValuesFromPartitionPath: 
Boolean =
-internalSchemaOpt.isEmpty
+  internalSchemaOpt.isEmpty
 
   override lazy val mandatoryFields: Seq[String] = Seq.empty
 
+  // Before Spark 3.4.0: PartitioningAwareFileIndex.BASE_PATH_PARAM
+  // Since Spark 3.4.0: FileIndexOptions.BASE_PATH_PARAM
+  val BASE_PATH_PARAM = "basePath"
+
   override def updatePrunedDataSchema(prunedSchema: StructType): Relation =
 this.copy(prunedDataSchema = Some(prunedSchema))
 
   override def imbueConfigs(sqlContext: SQLContext): Unit = {
 super.imbueConfigs(sqlContext)
 // TODO Issue with setting this to true in spark 332
-if (!HoodieSparkUtils.gteqSpark3_3_2) {
+if (HoodieSparkUtils.gteqSpark3_4 || !HoodieSparkUtils.gteqSpark3_3_2) {

Review Comment:
   We can do that.  I keep the existing code the same in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] yihua commented on a diff in pull request #8885: [HUDI-6198] Support Hudi on Spark 3.4.0

2023-06-09 Thread via GitHub


yihua commented on code in PR #8885:
URL: https://github.com/apache/hudi/pull/8885#discussion_r1225127785


##
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieSparkPartitionedFileUtils.scala:
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.catalyst.InternalRow
+
+/**
+ * Utils on Spark [[PartitionedFile]] to adapt to type changes.
+ * Before Spark 3.4.0,
+ * ```
+ * case class PartitionedFile(
+ *   partitionValues: InternalRow,
+ *   filePath: String,
+ *   start: Long,
+ *   @transient locations: Array[String] = Array.empty)
+ * ```,
+ * Since Spark 3.4.0, the filePath is switch to [[SparkPath]] for type safety:
+ * ```
+ * case class PartitionedFile(
+ *   partitionValues: InternalRow,
+ *   filePath: SparkPath,
+ *   start: Long,
+ *   length: Long,
+ *   @transient locations: Array[String] = Array.empty,
+ *   modificationTime: Long = 0L,
+ *   fileSize: Long = 0L)
+ * ```
+ */
+trait HoodieSparkPartitionedFileUtils extends Serializable {
+  /**
+   * Gets the Hadoop [[Path]] instance from Spark [[PartitionedFile]] instance.
+   *
+   * @param partitionedFile Spark [[PartitionedFile]] instance.
+   * @return Hadoop [[Path]] instance.
+   */
+  def getPathFromPartitionedFile(partitionedFile: PartitionedFile): Path

Review Comment:
   `SparkPath` has `toPath` function to efficiently return the `Path` instance 
in Spark 3.4, so I opt to have two separate APIs here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] yihua commented on a diff in pull request #8885: [HUDI-6198] Support Hudi on Spark 3.4.0

2023-06-09 Thread via GitHub


yihua commented on code in PR #8885:
URL: https://github.com/apache/hudi/pull/8885#discussion_r1224877296


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala:
##
@@ -17,31 +17,30 @@
 
 package org.apache.spark.sql.hudi.command
 
-import java.nio.charset.StandardCharsets
 import org.apache.avro.Schema
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
 import org.apache.hudi.common.table.timeline.HoodieInstant.State
 import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, 
HoodieInstant}
 import org.apache.hudi.common.util.{CommitUtils, Option}
 import org.apache.hudi.table.HoodieSparkTable
-import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, 
HoodieWriterUtils}
+import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, 
HoodieWriterUtils, SparkAdapterSupport}
 import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
 import org.apache.spark.sql.hudi.HoodieOptionConfig
 import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 
+import java.nio.charset.StandardCharsets
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
 /**
  * Command for add new columns to the hudi table.
  */
 case class AlterHoodieTableAddColumnsCommand(
-   tableId: TableIdentifier,
+  tableId: TableIdentifier,

Review Comment:
   Fixed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] yihua commented on a diff in pull request #8885: [HUDI-6198] Support Hudi on Spark 3.4.0

2023-06-09 Thread via GitHub


yihua commented on code in PR #8885:
URL: https://github.com/apache/hudi/pull/8885#discussion_r1224780310


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##
@@ -66,17 +66,21 @@ case class BaseFileOnlyRelation(override val sqlContext: 
SQLContext,
   // NOTE: This override has to mirror semantic of whenever this Relation is 
converted into [[HadoopFsRelation]],
   //   which is currently done for all cases, except when Schema Evolution 
is enabled
   override protected val shouldExtractPartitionValuesFromPartitionPath: 
Boolean =
-internalSchemaOpt.isEmpty
+  internalSchemaOpt.isEmpty
 
   override lazy val mandatoryFields: Seq[String] = Seq.empty
 
+  // Before Spark 3.4.0: PartitioningAwareFileIndex.BASE_PATH_PARAM
+  // Since Spark 3.4.0: FileIndexOptions.BASE_PATH_PARAM
+  val BASE_PATH_PARAM = "basePath"
+
   override def updatePrunedDataSchema(prunedSchema: StructType): Relation =
 this.copy(prunedDataSchema = Some(prunedSchema))
 
   override def imbueConfigs(sqlContext: SQLContext): Unit = {
 super.imbueConfigs(sqlContext)
 // TODO Issue with setting this to true in spark 332
-if (!HoodieSparkUtils.gteqSpark3_3_2) {
+if (HoodieSparkUtils.gteqSpark3_4 || !HoodieSparkUtils.gteqSpark3_3_2) {

Review Comment:
   JIRA to track: HUDI-6347



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] yihua commented on a diff in pull request #8885: [HUDI-6198] Support Hudi on Spark 3.4.0

2023-06-09 Thread via GitHub


yihua commented on code in PR #8885:
URL: https://github.com/apache/hudi/pull/8885#discussion_r1224771994


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala:
##
@@ -34,6 +34,15 @@ class HoodieParquetFileFormat extends ParquetFileFormat with 
SparkAdapterSupport
 
   override def toString: String = "Hoodie-Parquet"
 
+  override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
+if (HoodieSparkUtils.gteqSpark3_4) {

Review Comment:
   The tests fail for other spark versions if I don't add this check.
   ```
   Merge Hudi to Hudi *** FAILED ***
   2023-06-06T23:38:24.7660935Z   org.apache.spark.SparkException: Job aborted 
due to stage failure: Task 0 in stage 3194.0 failed 1 times, most recent 
failure: Lost task 0.0 in stage 3194.0 (TID 3768) (fv-az1128-658 executor 
driver): java.lang.ClassCastException: 
org.apache.spark.sql.vectorized.ColumnarBatchRow cannot be cast to 
org.apache.spark.sql.vectorized.ColumnarBatch
   2023-06-06T23:38:24.7662056Z at 
org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.next(DataSourceScanExec.scala:560)
   2023-06-06T23:38:24.7662628Z at 
org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.next(DataSourceScanExec.scala:549)
   2023-06-06T23:38:24.7663391Z at 
scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
   ```



##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##
@@ -66,17 +66,21 @@ case class BaseFileOnlyRelation(override val sqlContext: 
SQLContext,
   // NOTE: This override has to mirror semantic of whenever this Relation is 
converted into [[HadoopFsRelation]],
   //   which is currently done for all cases, except when Schema Evolution 
is enabled
   override protected val shouldExtractPartitionValuesFromPartitionPath: 
Boolean =
-internalSchemaOpt.isEmpty
+  internalSchemaOpt.isEmpty
 
   override lazy val mandatoryFields: Seq[String] = Seq.empty
 
+  // Before Spark 3.4.0: PartitioningAwareFileIndex.BASE_PATH_PARAM
+  // Since Spark 3.4.0: FileIndexOptions.BASE_PATH_PARAM
+  val BASE_PATH_PARAM = "basePath"
+
   override def updatePrunedDataSchema(prunedSchema: StructType): Relation =
 this.copy(prunedDataSchema = Some(prunedSchema))
 
   override def imbueConfigs(sqlContext: SQLContext): Unit = {
 super.imbueConfigs(sqlContext)
 // TODO Issue with setting this to true in spark 332
-if (!HoodieSparkUtils.gteqSpark3_3_2) {
+if (HoodieSparkUtils.gteqSpark3_4 || !HoodieSparkUtils.gteqSpark3_3_2) {

Review Comment:
   The tests fail for other spark versions if I don't add this check.



##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##
@@ -66,17 +66,21 @@ case class BaseFileOnlyRelation(override val sqlContext: 
SQLContext,
   // NOTE: This override has to mirror semantic of whenever this Relation is 
converted into [[HadoopFsRelation]],
   //   which is currently done for all cases, except when Schema Evolution 
is enabled
   override protected val shouldExtractPartitionValuesFromPartitionPath: 
Boolean =
-internalSchemaOpt.isEmpty
+  internalSchemaOpt.isEmpty
 
   override lazy val mandatoryFields: Seq[String] = Seq.empty
 
+  // Before Spark 3.4.0: PartitioningAwareFileIndex.BASE_PATH_PARAM
+  // Since Spark 3.4.0: FileIndexOptions.BASE_PATH_PARAM
+  val BASE_PATH_PARAM = "basePath"
+
   override def updatePrunedDataSchema(prunedSchema: StructType): Relation =
 this.copy(prunedDataSchema = Some(prunedSchema))
 
   override def imbueConfigs(sqlContext: SQLContext): Unit = {
 super.imbueConfigs(sqlContext)
 // TODO Issue with setting this to true in spark 332
-if (!HoodieSparkUtils.gteqSpark3_3_2) {
+if (HoodieSparkUtils.gteqSpark3_4 || !HoodieSparkUtils.gteqSpark3_3_2) {

Review Comment:
   ```
   Merge Hudi to Hudi *** FAILED ***
   2023-06-06T23:38:24.7660935Z   org.apache.spark.SparkException: Job aborted 
due to stage failure: Task 0 in stage 3194.0 failed 1 times, most recent 
failure: Lost task 0.0 in stage 3194.0 (TID 3768) (fv-az1128-658 executor 
driver): java.lang.ClassCastException: 
org.apache.spark.sql.vectorized.ColumnarBatchRow cannot be cast to 
org.apache.spark.sql.vectorized.ColumnarBatch
   2023-06-06T23:38:24.7662056Z at 
org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.next(DataSourceScanExec.scala:560)
   2023-06-06T23:38:24.7662628Z at 
org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.next(DataSourceScanExec.scala:549)
   2023-06-06T23:38:24.7663391Z at 
scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commi

[GitHub] [hudi] yihua commented on a diff in pull request #8885: [HUDI-6198] Support Hudi on Spark 3.4.0

2023-06-09 Thread via GitHub


yihua commented on code in PR #8885:
URL: https://github.com/apache/hudi/pull/8885#discussion_r1224726026


##
hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33HoodieParquetFileFormat.scala:
##
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.client.utils.SparkInternalSchemaConverter
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.hudi.internal.schema.action.InternalSchemaMerger
+import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import 
org.apache.spark.sql.execution.datasources.parquet.Spark33HoodieParquetFileFormat._
+import org.apache.spark.sql.execution.datasources.{DataSourceUtils, 
PartitionedFile, RecordReaderIterator}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{AtomicType, DataType, StructField, 
StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+import java.net.URI
+
+/**
+ * This class is an extension of [[ParquetFileFormat]] overriding 
Spark-specific behavior
+ * that's not possible to customize in any other way
+ *
+ * NOTE: This is a version of [[AvroDeserializer]] impl from Spark 3.2.1 w/ w/ 
the following changes applied to it:
+ * 
+ * Avoiding appending partition values to the rows read from the data 
file
+ * Schema on-read
+ * 
+ */
+class Spark33HoodieParquetFileFormat(private val shouldAppendPartitionValues: 
Boolean) extends ParquetFileFormat {

Review Comment:
   This is copied from `Spark32PlusHoodieParquetFileFormat`.  Let's take the 
Spark 3.3.2 improvement in a separate PR and not put everything in the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] yihua commented on a diff in pull request #8885: [HUDI-6198] Support Hudi on Spark 3.4.0

2023-06-09 Thread via GitHub


yihua commented on code in PR #8885:
URL: https://github.com/apache/hudi/pull/8885#discussion_r1224724972


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala:
##
@@ -92,15 +91,16 @@ case class AlterHoodieTableAddColumnsCommand(
   }
 }
 
-object AlterHoodieTableAddColumnsCommand {
+object AlterHoodieTableAddColumnsCommand extends SparkAdapterSupport {
   /**
* Generate an empty commit with new schema to change the table's schema.
-   * @param schema The new schema to commit.
-   * @param hoodieCatalogTable  The hoodie catalog table.
-   * @param sparkSession The spark session.
+   *
+   * @param schema The new schema to commit.
+   * @param hoodieCatalogTable The hoodie catalog table.
+   * @param sparkSession   The spark session.
*/
   def commitWithSchema(schema: Schema, hoodieCatalogTable: HoodieCatalogTable,
-  sparkSession: SparkSession): Unit = {
+   sparkSession: SparkSession): Unit = {

Review Comment:
   This is automatically done by IDE.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] yihua commented on a diff in pull request #8885: [HUDI-6198] Support Hudi on Spark 3.4.0

2023-06-09 Thread via GitHub


yihua commented on code in PR #8885:
URL: https://github.com/apache/hudi/pull/8885#discussion_r1224724397


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala:
##
@@ -34,6 +34,15 @@ class HoodieParquetFileFormat extends ParquetFileFormat with 
SparkAdapterSupport
 
   override def toString: String = "Hoodie-Parquet"
 
+  override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
+if (HoodieSparkUtils.gteqSpark3_4) {

Review Comment:
   Let's take this separately since this PR is not related to Spark 3.3.2.  The 
code here is to make sure all other versions maintain the same logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] yihua commented on a diff in pull request #8885: [HUDI-6198] Support Hudi on Spark 3.4.0

2023-06-09 Thread via GitHub


yihua commented on code in PR #8885:
URL: https://github.com/apache/hudi/pull/8885#discussion_r1224612364


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##
@@ -66,17 +66,21 @@ case class BaseFileOnlyRelation(override val sqlContext: 
SQLContext,
   // NOTE: This override has to mirror semantic of whenever this Relation is 
converted into [[HadoopFsRelation]],
   //   which is currently done for all cases, except when Schema Evolution 
is enabled
   override protected val shouldExtractPartitionValuesFromPartitionPath: 
Boolean =
-internalSchemaOpt.isEmpty
+  internalSchemaOpt.isEmpty
 
   override lazy val mandatoryFields: Seq[String] = Seq.empty
 
+  // Before Spark 3.4.0: PartitioningAwareFileIndex.BASE_PATH_PARAM
+  // Since Spark 3.4.0: FileIndexOptions.BASE_PATH_PARAM
+  val BASE_PATH_PARAM = "basePath"
+
   override def updatePrunedDataSchema(prunedSchema: StructType): Relation =
 this.copy(prunedDataSchema = Some(prunedSchema))
 
   override def imbueConfigs(sqlContext: SQLContext): Unit = {
 super.imbueConfigs(sqlContext)
 // TODO Issue with setting this to true in spark 332
-if (!HoodieSparkUtils.gteqSpark3_3_2) {
+if (HoodieSparkUtils.gteqSpark3_4 || !HoodieSparkUtils.gteqSpark3_3_2) {

Review Comment:
   I would say let's keep the logic for other spark versions as is and not 
balloon the scope of this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] yihua commented on a diff in pull request #8885: [HUDI-6198] Support Hudi on Spark 3.4.0

2023-06-09 Thread via GitHub


yihua commented on code in PR #8885:
URL: https://github.com/apache/hudi/pull/8885#discussion_r1224611690


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##
@@ -66,17 +66,21 @@ case class BaseFileOnlyRelation(override val sqlContext: 
SQLContext,
   // NOTE: This override has to mirror semantic of whenever this Relation is 
converted into [[HadoopFsRelation]],
   //   which is currently done for all cases, except when Schema Evolution 
is enabled
   override protected val shouldExtractPartitionValuesFromPartitionPath: 
Boolean =
-internalSchemaOpt.isEmpty
+  internalSchemaOpt.isEmpty
 
   override lazy val mandatoryFields: Seq[String] = Seq.empty
 
+  // Before Spark 3.4.0: PartitioningAwareFileIndex.BASE_PATH_PARAM
+  // Since Spark 3.4.0: FileIndexOptions.BASE_PATH_PARAM
+  val BASE_PATH_PARAM = "basePath"
+
   override def updatePrunedDataSchema(prunedSchema: StructType): Relation =
 this.copy(prunedDataSchema = Some(prunedSchema))
 
   override def imbueConfigs(sqlContext: SQLContext): Unit = {
 super.imbueConfigs(sqlContext)
 // TODO Issue with setting this to true in spark 332
-if (!HoodieSparkUtils.gteqSpark3_3_2) {
+if (HoodieSparkUtils.gteqSpark3_4 || !HoodieSparkUtils.gteqSpark3_3_2) {

Review Comment:
   ```
   Merge Hudi to Hudi *** FAILED ***
   2023-06-06T23:38:24.7660935Z   org.apache.spark.SparkException: Job aborted 
due to stage failure: Task 0 in stage 3194.0 failed 1 times, most recent 
failure: Lost task 0.0 in stage 3194.0 (TID 3768) (fv-az1128-658 executor 
driver): java.lang.ClassCastException: 
org.apache.spark.sql.vectorized.ColumnarBatchRow cannot be cast to 
org.apache.spark.sql.vectorized.ColumnarBatch
   2023-06-06T23:38:24.7662056Z at 
org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.next(DataSourceScanExec.scala:560)
   2023-06-06T23:38:24.7662628Z at 
org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.next(DataSourceScanExec.scala:549)
   2023-06-06T23:38:24.7663391Z at 
scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] yihua commented on a diff in pull request #8885: [HUDI-6198] Support Hudi on Spark 3.4.0

2023-06-09 Thread via GitHub


yihua commented on code in PR #8885:
URL: https://github.com/apache/hudi/pull/8885#discussion_r1224610497


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##
@@ -66,17 +66,21 @@ case class BaseFileOnlyRelation(override val sqlContext: 
SQLContext,
   // NOTE: This override has to mirror semantic of whenever this Relation is 
converted into [[HadoopFsRelation]],
   //   which is currently done for all cases, except when Schema Evolution 
is enabled
   override protected val shouldExtractPartitionValuesFromPartitionPath: 
Boolean =
-internalSchemaOpt.isEmpty
+  internalSchemaOpt.isEmpty
 
   override lazy val mandatoryFields: Seq[String] = Seq.empty
 
+  // Before Spark 3.4.0: PartitioningAwareFileIndex.BASE_PATH_PARAM
+  // Since Spark 3.4.0: FileIndexOptions.BASE_PATH_PARAM
+  val BASE_PATH_PARAM = "basePath"
+
   override def updatePrunedDataSchema(prunedSchema: StructType): Relation =
 this.copy(prunedDataSchema = Some(prunedSchema))
 
   override def imbueConfigs(sqlContext: SQLContext): Unit = {
 super.imbueConfigs(sqlContext)
 // TODO Issue with setting this to true in spark 332
-if (!HoodieSparkUtils.gteqSpark3_3_2) {
+if (HoodieSparkUtils.gteqSpark3_4 || !HoodieSparkUtils.gteqSpark3_3_2) {

Review Comment:
   The tests fail for other spark versions if I don't add this check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] yihua commented on a diff in pull request #8885: [HUDI-6198] Support Hudi on Spark 3.4.0

2023-06-09 Thread via GitHub


yihua commented on code in PR #8885:
URL: https://github.com/apache/hudi/pull/8885#discussion_r1224609909


##
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala:
##
@@ -52,6 +52,31 @@ trait HoodieCatalystPlansUtils {
*/
   def createJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType): 
Join
 
+  /**
+   * Decomposes [[MatchMergeIntoTable]] into its arguments with accommodation 
for
+   * case class changes of [[MergeIntoTable]] in Spark 3.4.
+   *
+   * Before Spark 3.4.0 (five arguments):
+   *
+   * case class MergeIntoTable(
+   * targetTable: LogicalPlan,
+   * sourceTable: LogicalPlan,
+   * mergeCondition: Expression,
+   * matchedActions: Seq[MergeAction],
+   * notMatchedActions: Seq[MergeAction]) extends BinaryCommand with 
SupportsSubquery
+   *
+   * Since Spark 3.4.0 (six arguments):
+   *
+   * case class MergeIntoTable(
+   * targetTable: LogicalPlan,
+   * sourceTable: LogicalPlan,
+   * mergeCondition: Expression,
+   * matchedActions: Seq[MergeAction],
+   * notMatchedActions: Seq[MergeAction],
+   * notMatchedBySourceActions: Seq[MergeAction]) extends BinaryCommand with 
SupportsSubquery
+   */
+  def unapplyMergeIntoTable(plan: LogicalPlan): Option[(LogicalPlan, 
LogicalPlan, Expression)]

Review Comment:
   This is ultimately used in `HoodieAnalysis` to match the plan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] yihua commented on a diff in pull request #8885: [HUDI-6198] Support Hudi on Spark 3.4.0

2023-06-09 Thread via GitHub


yihua commented on code in PR #8885:
URL: https://github.com/apache/hudi/pull/8885#discussion_r1224607155


##
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkJdbcUtils.scala:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
+import org.apache.spark.sql.jdbc.JdbcDialect
+import org.apache.spark.sql.types.StructType
+
+import java.sql.ResultSet
+
+/**
+ * Util functions for JDBC source and tables in Spark.
+ */
+object SparkJdbcUtils {
+  /**
+   * Takes a [[ResultSet]] and returns its Catalyst schema.
+   *
+   * @param resultSet  [[ResultSet]] instance.
+   * @param dialect[[JdbcDialect]] instance.
+   * @param alwaysNullable If true, all the columns are nullable.
+   * @return A [[StructType]] giving the Catalyst schema.
+   */
+  def getSchema(resultSet: ResultSet,
+dialect: JdbcDialect,
+alwaysNullable: Boolean): StructType = {
+// NOTE: Since Spark 3.4.0, the function signature of 
[[JdbcUtils.getSchema]] is changed
+// to have four arguments. Although calling this function in Scala with 
the first three
+// arguments works, with the fourth argument using the default value, this 
breaks the
+// Java code as Java cannot use the default argument value and has to use 
four arguments.
+// Instead of calling this function from Java code directly, we create 
this util function
+// in Scala for compatibility in Hudi code.
+//
+// Before Spark 3.4.0 (three arguments):
+// def getSchema(
+//  resultSet: ResultSet,
+//  dialect: JdbcDialect,
+//  alwaysNullable: Boolean = false): StructType
+//
+// Since Spark 3.4.0 (four arguments):
+// def getSchema(
+//  resultSet: ResultSet,
+//  dialect: JdbcDialect,
+//  alwaysNullable: Boolean = false,
+//  isTimestampNTZ: Boolean = false): StructType
+JdbcUtils.getSchema(resultSet, dialect, alwaysNullable)

Review Comment:
   This is used by `UtilHelpers.getJDBCSchema` for JDBC-based schema provider.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] yihua commented on a diff in pull request #8885: [HUDI-6198] Support Hudi on Spark 3.4.0

2023-06-08 Thread via GitHub


yihua commented on code in PR #8885:
URL: https://github.com/apache/hudi/pull/8885#discussion_r1223853653


##
hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34HoodieParquetFileFormat.scala:
##
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.client.utils.SparkInternalSchemaConverter
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.hudi.internal.schema.action.InternalSchemaMerger
+import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.WholeStageCodegenExec
+import 
org.apache.spark.sql.execution.datasources.parquet.Spark34HoodieParquetFileFormat._
+import org.apache.spark.sql.execution.datasources.{DataSourceUtils, 
PartitionedFile, RecordReaderIterator}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{AtomicType, DataType, StructField, 
StructType}
+import org.apache.spark.util.SerializableConfiguration
+/**
+ * This class is an extension of [[ParquetFileFormat]] overriding 
Spark-specific behavior
+ * that's not possible to customize in any other way
+ *
+ * NOTE: This is a version of [[AvroDeserializer]] impl from Spark 3.2.1 w/ w/ 
the following changes applied to it:
+ * 
+ *   Avoiding appending partition values to the rows read from the data 
file
+ *   Schema on-read
+ * 
+ */
+class Spark34HoodieParquetFileFormat(private val shouldAppendPartitionValues: 
Boolean) extends ParquetFileFormat {
+
+  override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
+val conf = sparkSession.sessionState.conf
+conf.parquetVectorizedReaderEnabled && 
schema.forall(_.dataType.isInstanceOf[AtomicType])
+  }
+
+  def supportsColumnar(sparkSession: SparkSession, schema: StructType): 
Boolean = {
+val conf = sparkSession.sessionState.conf
+// Only output columnar if there is WSCG to read it.
+val requiredWholeStageCodegenSettings =
+  conf.wholeStageEnabled && !WholeStageCodegenExec.isTooManyFields(conf, 
schema)
+requiredWholeStageCodegenSettings &&
+  supportBatch(sparkSession, schema)
+  }
+
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+  dataSchema: StructType,
+  partitionSchema: StructType,
+  requiredSchema: StructType,
+  filters: Seq[Filter],
+  options: Map[String, String],
+  hadoopConf: Configuration): 
PartitionedFile => Iterator[InternalRow] = {
+hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[ParquetReadSupport].getName)
+hadoopConf.set(
+  ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+  requiredSchema.json)
+hadoopConf.set(
+  ParquetWriteSupport.SP

[GitHub] [hudi] yihua commented on a diff in pull request #8885: [HUDI-6198] Support Hudi on Spark 3.4.0

2023-06-08 Thread via GitHub


yihua commented on code in PR #8885:
URL: https://github.com/apache/hudi/pull/8885#discussion_r1223791778


##
hudi-common/src/main/java/org/apache/hudi/common/util/JsonUtils.java:
##
@@ -35,6 +36,8 @@ public class JsonUtils {
   private static final ObjectMapper MAPPER = new ObjectMapper();
 
   static {
+registerModules(MAPPER);
+

Review Comment:
   This is fixed now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] yihua commented on a diff in pull request #8885: [HUDI-6198] Support Hudi on Spark 3.4.0

2023-06-08 Thread via GitHub


yihua commented on code in PR #8885:
URL: https://github.com/apache/hudi/pull/8885#discussion_r1223651454


##
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala:
##
@@ -56,30 +58,37 @@ class TestIndexSyntax extends HoodieSparkSqlTestBase {
 
 var logicalPlan = sqlParser.parsePlan(s"show indexes from 
default.$tableName")
 var resolvedLogicalPlan = analyzer.execute(logicalPlan)
-
assertResult(s"`default`.`$tableName`")(resolvedLogicalPlan.asInstanceOf[ShowIndexesCommand].table.identifier.quotedString)

Review Comment:
   FR: `table.identifier.quotedString` now also has catalog name as the prefix.



##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##
@@ -733,8 +734,8 @@ object HoodieBaseRelation extends SparkAdapterSupport {
 
 partitionedFile => {
   val hadoopConf = hadoopConfBroadcast.value.get()
-  val reader = new HoodieAvroHFileReader(hadoopConf, new 
Path(partitionedFile.filePath),
-new CacheConfig(hadoopConf))
+  val filePath = 
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(partitionedFile)

Review Comment:
   For Reviewer (FR): all the changes in the common module of introducing new 
adapter support are because of Spark 3.4 class and API changes.



##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala:
##
@@ -34,6 +34,15 @@ class HoodieParquetFileFormat extends ParquetFileFormat with 
SparkAdapterSupport
 
   override def toString: String = "Hoodie-Parquet"
 
+  override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {

Review Comment:
   FR: Spark 3.4 now supports vectorized reader on nested fields.  However, 
Hudi does not support this yet due to custom schema evolution logic.  So we add 
logic to override `supportBatch` in `HoodieParquetFileFormat` for Spark 3.4.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] yihua commented on a diff in pull request #8885: [HUDI-6198] Support Hudi on Spark 3.4.0

2023-06-08 Thread via GitHub


yihua commented on code in PR #8885:
URL: https://github.com/apache/hudi/pull/8885#discussion_r1223580325


##
hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala:
##
@@ -38,6 +36,14 @@ object HoodieSpark32CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
   case _ => None
 }
 
+  override def unapplyMergeIntoTable(plan: LogicalPlan): Option[(LogicalPlan, 
LogicalPlan, Expression)] = {
+plan match {
+  case MergeIntoTable(targetTable, sourceTable, mergeCondition, _, _) =>
+Some((targetTable, sourceTable, mergeCondition))

Review Comment:
   The inner pair of parentheses is for Scala tuple, which is required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] yihua commented on a diff in pull request #8885: [HUDI-6198] Support Hudi on Spark 3.4.0

2023-06-08 Thread via GitHub


yihua commented on code in PR #8885:
URL: https://github.com/apache/hudi/pull/8885#discussion_r1223579632


##
hudi-common/src/main/java/org/apache/hudi/common/util/JsonUtils.java:
##
@@ -35,6 +36,8 @@ public class JsonUtils {
   private static final ObjectMapper MAPPER = new ObjectMapper();
 
   static {
+registerModules(MAPPER);
+

Review Comment:
   #8840 contains some minor improvements which I'm currently not inclined to 
include.  I'll revise #8840 to contain necessary fixes and then land it before 
this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org