This is an automated email from the ASF dual-hosted git repository.
jonvex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7dad9139653 [HUDI-8197] Get rid of set sqlconf in Filegroupreader
parquet file format (#11928)
7dad9139653 is described below
commit 7dad9139653472126d5ba99daf0c20abfd4d4d85
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Sep 12 15:25:37 2024 -0400
[HUDI-8197] Get rid of set sqlconf in Filegroupreader parquet file format
(#11928)
Get rid of set sqlconf because it has side effects
Also use fgreader for all the schema evolution ds tests
---------
Co-authored-by: Jonathan Vexler <=>
---
.../parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala | 1 -
.../org/apache/hudi/TestAvroSchemaResolutionSupport.scala | 6 +++++-
.../TestHoodieDeltaStreamerSchemaEvolutionBase.java | 10 +++-------
.../TestHoodieDeltaStreamerSchemaEvolutionExtensive.java | 6 +++---
.../TestHoodieDeltaStreamerSchemaEvolutionQuick.java | 2 +-
5 files changed, 12 insertions(+), 13 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 197af83d4e5..6422d39e976 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -87,7 +87,6 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState:
HoodieTableState,
supportBatchCalled = true
supportBatchResult = !isMOR && !isIncremental && !isBootstrap &&
super.supportBatch(sparkSession, schema)
}
- sparkSession.conf.set(PARQUET_VECTORIZED_READER_ENABLED.key,
supportBatchResult)
supportBatchResult
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
index 7cc76bea198..06bb8c23d22 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
@@ -875,12 +875,16 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
// after implicit type change, read the table with vectorized read enabled
//fg reader with mor does not support vectorized currently and will auto
read by row
- if (HoodieSparkUtils.gteqSpark3_3 && !useFileGroupReader) {
+ if (HoodieSparkUtils.gteqSpark3_3 && (isCow || !useFileGroupReader)) {
assertThrows(classOf[SparkException]){
withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" ->
"true") {
readTable(tempRecordPath, useFileGroupReader)
}
}
+ } else {
+ withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" ->
"true") {
+ readTable(tempRecordPath, useFileGroupReader)
+ }
}
withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" ->
"false") {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
index 945f64ece6e..bc26578f8df 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
@@ -25,7 +25,6 @@ import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.TestHoodieSparkUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.WriteOperationType;
@@ -103,12 +102,6 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase
extends HoodieDeltaStrea
protected boolean useTransformer;
protected boolean userProvidedSchema;
- protected Map<String, String> readOpts = new HashMap<String, String>() {
- {
- put(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false");
- }
- };
-
@BeforeAll
public static void initKafka() {
defaultSchemaProviderClassName = TestSchemaProvider.class.getName();
@@ -125,6 +118,9 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase
extends HoodieDeltaStrea
sourceSchemaFile = "";
targetSchemaFile = "";
topicName = "topic" + testNum;
+ if (HoodieSparkUtils.gteqSpark3_3()) {
+
sparkSession.conf().set("spark.sql.parquet.enableNestedColumnVectorizedReader",
"false");
+ }
}
@AfterEach
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
index d152acae2a2..b9925dfbf3d 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
@@ -170,7 +170,7 @@ public class
TestHoodieDeltaStreamerSchemaEvolutionExtensive extends TestHoodieD
}
assertRecordCount(numRecords);
- Dataset<Row> df =
sparkSession.read().format("hudi").options(readOpts).load(tableBasePath);
+ Dataset<Row> df = sparkSession.read().format("hudi").load(tableBasePath);
df.show(9,false);
df.select(updateColumn).show(9);
for (String condition : conditions.keySet()) {
@@ -442,7 +442,7 @@ public class
TestHoodieDeltaStreamerSchemaEvolutionExtensive extends TestHoodieD
protected String typePromoUpdates;
protected void assertDataType(String colName, DataType expectedType) {
- assertEquals(expectedType,
sparkSession.read().format("hudi").options(readOpts).load(tableBasePath).select(colName).schema().fields()[0].dataType());
+ assertEquals(expectedType,
sparkSession.read().format("hudi").load(tableBasePath).select(colName).schema().fields()[0].dataType());
}
protected void testTypePromotionBase(String colName, DataType startType,
DataType updateType) throws Exception {
@@ -499,7 +499,7 @@ public class
TestHoodieDeltaStreamerSchemaEvolutionExtensive extends TestHoodieD
assertFileNumber(numFiles, false);
}
assertRecordCount(numRecords);
-
sparkSession.read().format("hudi").options(readOpts).load(tableBasePath).select(colName).show(9);
+
sparkSession.read().format("hudi").load(tableBasePath).select(colName).show(9);
assertDataType(colName, endType);
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
index a108dec74b1..652ed0f5e5d 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
@@ -241,7 +241,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
}
assertRecordCount(numRecords);
- df =
sparkSession.read().format("hudi").options(readOpts).load(tableBasePath);
+ df = sparkSession.read().format("hudi").load(tableBasePath);
df.show(100,false);
df.cache();
assertDataType(df, "tip_history",
DataTypes.createArrayType(DataTypes.LongType));