This is an automated email from the ASF dual-hosted git repository.

vhs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 43569426895d chore: Test Runtime Improvements: lower number of files, 
parallelize reads (#17671)
43569426895d is described below

commit 43569426895d68f9e411e542f3c535d2dca00236
Author: Tim Brown <[email protected]>
AuthorDate: Wed Dec 24 01:27:54 2025 -0500

    chore: Test Runtime Improvements: lower number of files, parallelize reads 
(#17671)
    
    * lower number of files, parallelize reads
    
    * avoid corrupt file logs for col stats
    
    * close clients, cleanup signature, use simple index by default
    
    * Revert "close clients, cleanup signature, use simple index by default"
    
    This reverts commit 678f3ba5e495ef34598ebe78f4046de277337e53.
    
    * close client, avoid show
    
    * avoid some noisy logs
    
    * use simply index
    
    * further reduce sleep
---
 .../hudi/testutils/HoodieMergeOnReadTestUtils.java |  52 ++++++-----
 .../hudi/utils/HoodieWriterClientTestHarness.java  |   6 +-
 .../testutils/HoodieSparkClientTestHarness.java    |   6 --
 .../hudi/client/TestHoodieClientMultiWriter.java   |   2 +-
 .../hudi/functional/TestConsistentBucketIndex.java |   2 +-
 .../hudi/functional/TestHoodieBackedMetadata.java  |   9 +-
 .../hudi/functional/TestHoodieFileSystemViews.java |   4 +
 .../TestHoodieSparkMergeOnReadTableCompaction.java | 103 ++++++++++-----------
 .../TestMetadataTableWithSparkDataSource.scala     |  10 +-
 .../hudi/functional/TestRecordLevelIndex.scala     |  16 +++-
 10 files changed, 108 insertions(+), 102 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
index cd90c8d2613a..9b2cb5a64fcc 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
@@ -43,7 +43,9 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -65,8 +67,7 @@ public class HoodieMergeOnReadTestUtils {
 
   public static List<GenericRecord> 
getRecordsUsingInputFormat(StorageConfiguration<?> conf, List<String> 
inputPaths,
                                                                String 
basePath, JobConf jobConf, boolean realtime, boolean populateMetaFields) {
-    Schema schema = new 
Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
-    return getRecordsUsingInputFormat(conf, inputPaths, basePath, jobConf, 
realtime, schema,
+    return getRecordsUsingInputFormat(conf, inputPaths, basePath, jobConf, 
realtime, HoodieTestDataGenerator.AVRO_SCHEMA,
         HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES, false, new 
ArrayList<>(), populateMetaFields);
   }
 
@@ -104,33 +105,36 @@ public class HoodieMergeOnReadTestUtils {
         .map(HoodieAvroUtils::createNewSchemaField)
         .collect(Collectors.toList()));
 
-    List<GenericRecord> records = new ArrayList<>();
     try {
       FileInputFormat.setInputPaths(jobConf, String.join(",", inputPaths));
       InputSplit[] splits = inputFormat.getSplits(jobConf, inputPaths.size());
-
-      for (InputSplit split : splits) {
-        RecordReader recordReader = inputFormat.getRecordReader(split, 
jobConf, null);
-        Object key = recordReader.createKey();
-        ArrayWritable writable = (ArrayWritable) recordReader.createValue();
-        while (recordReader.next(key, writable)) {
-          GenericRecordBuilder newRecord = new 
GenericRecordBuilder(projectedSchema);
-          // writable returns an array with [field1, field2, 
_hoodie_commit_time,
-          // _hoodie_commit_seqno]
-          Writable[] values = writable.get();
-          schema.getFields().stream()
-              .filter(f -> !projectCols || projectedColumns.contains(f.name()))
-              .map(f -> Pair.of(projectedSchema.getFields().stream()
-                  .filter(p -> f.name().equals(p.name())).findFirst().get(), 
f))
-              .forEach(fieldsPair -> newRecord.set(fieldsPair.getKey(), 
values[fieldsPair.getValue().pos()]));
-          records.add(newRecord.build());
+      return Arrays.stream(splits).parallel().flatMap(split -> {
+        List<GenericRecord> records = new ArrayList<>();
+        try {
+          RecordReader recordReader = inputFormat.getRecordReader(split, 
jobConf, null);
+          Object key = recordReader.createKey();
+          ArrayWritable writable = (ArrayWritable) recordReader.createValue();
+          while (recordReader.next(key, writable)) {
+            GenericRecordBuilder newRecord = new 
GenericRecordBuilder(projectedSchema);
+            // writable returns an array with [field1, field2, 
_hoodie_commit_time,
+            // _hoodie_commit_seqno]
+            Writable[] values = writable.get();
+            schema.getFields().stream()
+                .filter(f -> !projectCols || 
projectedColumns.contains(f.name()))
+                .map(f -> Pair.of(projectedSchema.getFields().stream()
+                    .filter(p -> f.name().equals(p.name())).findFirst().get(), 
f))
+                .forEach(fieldsPair -> newRecord.set(fieldsPair.getKey(), 
values[fieldsPair.getValue().pos()]));
+            records.add(newRecord.build());
+          }
+          recordReader.close();
+        } catch (IOException e) {
+          throw new UncheckedIOException(e);
         }
-        recordReader.close();
-      }
-    } catch (IOException ie) {
-      log.error("Read records error", ie);
+        return records.stream();
+      }).collect(Collectors.toList());
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
     }
-    return records;
   }
 
   private static void setPropsForInputFormat(FileInputFormat inputFormat, 
JobConf jobConf, Schema schema, String hiveColumnTypes, boolean projectCols, 
List<String> projectedCols,
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
index 82320cd5bf41..56732b995ec5 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
@@ -237,7 +237,7 @@ public abstract class HoodieWriterClientTestHarness extends 
HoodieCommonTestHarn
    * @return Config Builder
    */
   public HoodieWriteConfig.Builder 
getConfigBuilder(HoodieFailedWritesCleaningPolicy cleaningPolicy) {
-    return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, 
HoodieIndex.IndexType.BLOOM, cleaningPolicy);
+    return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, 
HoodieIndex.IndexType.SIMPLE, cleaningPolicy);
   }
 
   /**
@@ -250,7 +250,7 @@ public abstract class HoodieWriterClientTestHarness extends 
HoodieCommonTestHarn
   }
 
   public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
-    return getConfigBuilder(schemaStr, HoodieIndex.IndexType.BLOOM, 
HoodieFailedWritesCleaningPolicy.EAGER);
+    return getConfigBuilder(schemaStr, HoodieIndex.IndexType.SIMPLE, 
HoodieFailedWritesCleaningPolicy.EAGER);
   }
 
   public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, 
HoodieIndex.IndexType indexType) {
@@ -535,7 +535,7 @@ public abstract class HoodieWriterClientTestHarness extends 
HoodieCommonTestHarn
   }
 
   protected HoodieWriteConfig getSmallInsertWriteConfigForMDT(int 
insertSplitSize, String schemaStr, long smallFileSize, boolean 
mergeAllowDuplicateInserts) {
-    HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr, 
HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER);
+    HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr, 
HoodieIndex.IndexType.SIMPLE, HoodieFailedWritesCleaningPolicy.EAGER);
     return builder.withCompactionConfig(
                     HoodieCompactionConfig.newBuilder()
                             .compactionSmallFileSize(smallFileSize)
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
index 1b945b4e837c..97bd0e1175d7 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
@@ -621,12 +621,6 @@ public abstract class HoodieSparkClientTestHarness extends 
HoodieWriterClientTes
         tableView.getAllFileGroups(partition).collect(Collectors.toList());
     
fileGroups.addAll(tableView.getAllReplacedFileGroups(partition).collect(Collectors.toList()));
 
-    fileGroups.forEach(g -> log.info(g.toString()));
-    fileGroups.forEach(g -> g.getAllBaseFiles()
-        .forEach(b -> log.info(b.toString())));
-    fileGroups.forEach(g -> g.getAllFileSlices()
-        .forEach(s -> log.info(s.toString())));
-
     long numFiles = fileGroups.stream()
         .mapToLong(g -> g.getAllBaseFiles().count()
             + g.getAllFileSlices().mapToLong(s -> 
s.getLogFiles().count()).sum())
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
index 4bf1c2de2860..a4e051d6f23e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
@@ -457,7 +457,7 @@ public class TestHoodieClientMultiWriter extends 
HoodieClientTestBase {
       setUpMORTestTable();
     }
 
-    int heartBeatIntervalForCommit4 = 10 * 1000;
+    int heartBeatIntervalForCommit4 = 3 * 1000;
 
     HoodieWriteConfig writeConfig;
     TestingServer server = null;
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestConsistentBucketIndex.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestConsistentBucketIndex.java
index d44eca9a969b..560e46cee330 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestConsistentBucketIndex.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestConsistentBucketIndex.java
@@ -113,7 +113,7 @@ public class TestConsistentBucketIndex extends 
HoodieSparkClientTestHarness {
             .withIndexType(HoodieIndex.IndexType.BUCKET)
             .withIndexKeyField("_row_key")
             
.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING)
-            .withBucketNum("8")
+            .withBucketNum("4")
             .build())
         .build();
     writeClient = getHoodieWriteClient(config);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
index 8d51c21fbcf7..517ac117e51b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
@@ -251,7 +251,12 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     // bootstrap with few commits
     doPreBootstrapOperations(testTable);
 
-    writeConfig = getWriteConfig(true, true);
+    writeConfig = getWriteConfigBuilder(true, true, false)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withMetadataIndexColumnStats(false) // No real files written so 
avoid stats
+            .build())
+        .build();
     initWriteConfigAndMetatableWriter(writeConfig, true);
     syncTableMetadata(writeConfig);
     validateMetadata(testTable);
@@ -576,6 +581,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
         .withMetadataConfig(HoodieMetadataConfig.newBuilder()
             .enable(true)
             .enableMetrics(false)
+            .withMetadataIndexColumnStats(false)
             .withMaxNumDeltaCommitsBeforeCompaction(1)
             .build())
         .withCleanConfig(HoodieCleanConfig.newBuilder()
@@ -3111,6 +3117,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
                     .enableMetrics(false)
                     .withMaxNumDeltaCommitsBeforeCompaction(maxNumDeltaCommits 
- 1)
                     .withMaxNumDeltacommitsWhenPending(maxNumDeltaCommits)
+                    .withMetadataIndexColumnStats(false) // no real files so 
avoid reading column stats
                     .build())
             .build();
     initWriteConfigAndMetatableWriter(writeConfig, true);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieFileSystemViews.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieFileSystemViews.java
index 6807f6cbcbb2..387a87b902fa 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieFileSystemViews.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieFileSystemViews.java
@@ -91,6 +91,10 @@ public class TestHoodieFileSystemViews extends 
HoodieClientTestBase {
     for (HoodieTableType tableType : HoodieTableType.values()) {
       for (boolean enableMdt : Arrays.asList(true, false)) {
         for (FileSystemViewStorageType viewStorageType : 
Arrays.asList(FileSystemViewStorageType.MEMORY, 
FileSystemViewStorageType.SPILLABLE_DISK)) {
+          if (!enableMdt && viewStorageType == 
FileSystemViewStorageType.MEMORY) {
+            // This is the baseline case, no need to test here.
+            continue;
+          }
           for (int writerVersion : Arrays.asList(6, 8)) {
             testCases.add(Arguments.of(tableType, enableMdt, viewStorageType, 
writerVersion));
           }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
index f9f999ee3118..960b98eb73e2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
@@ -26,7 +26,6 @@ import org.apache.hudi.client.WriteClientTestUtils;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
@@ -141,8 +140,6 @@ public class TestHoodieSparkMergeOnReadTableCompaction 
extends SparkClientFuncti
             .withMaxNumDeltaCommitsBeforeCompaction(1)
             .compactionSmallFileSize(0)
             .build())
-        .withStorageConfig(HoodieStorageConfig.newBuilder()
-            .parquetMaxFileSize(1024).build())
         .withLayoutConfig(layoutConfig)
         
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(indexType).withBucketNum("1").build())
         .withMarkersTimelineServerBasedBatchIntervalMs(10)
@@ -183,59 +180,53 @@ public class TestHoodieSparkMergeOnReadTableCompaction 
extends SparkClientFuncti
   @ParameterizedTest
   @MethodSource("writeLogTest")
   public void testWriteLogDuringCompaction(boolean enableMetadataTable, 
boolean enableTimelineServer) throws IOException {
-    try {
-      //disable for this test because it seems like we process mor in a 
different order?
-      
jsc().hadoopConfiguration().set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
 "false");
-      Properties props = getPropertiesForKeyGen(true);
-      HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
-          .forTable("test-trip-table")
-          .withPath(basePath())
-          .withSchema(TRIP_EXAMPLE_SCHEMA)
-          .withParallelism(2, 2)
-            .withEmbeddedTimelineServerEnabled(enableTimelineServer)
-          
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
-          .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-              .withMaxNumDeltaCommitsBeforeCompaction(1).build())
-          .withLayoutConfig(HoodieLayoutConfig.newBuilder()
-              .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
-              
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
-          
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
-          .withMarkersTimelineServerBasedBatchIntervalMs(10)
-          .build();
-      props.putAll(config.getProps());
-
-      metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
-      client = getHoodieWriteClient(config);
-
-      final List<HoodieRecord> records = dataGen.generateInserts("001", 100);
-      JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 2);
-
-      // initialize 100 records
-      String commit1 = client.startCommit();
-      JavaRDD writeStatuses = client.upsert(writeRecords, commit1);
-      client.commit(commit1, writeStatuses);
-
-      // update 100 records
-      String commit2 = client.startCommit();
-      writeStatuses = client.upsert(writeRecords, commit2);
-      client.commit(commit2, writeStatuses);
-      // schedule compaction
-      client.scheduleCompaction(Option.empty());
-      // delete 50 records
-      List<HoodieKey> toBeDeleted = 
records.stream().map(HoodieRecord::getKey).limit(50).collect(Collectors.toList());
-      JavaRDD<HoodieKey> deleteRecords = jsc().parallelize(toBeDeleted, 2);
-      String commit3 = client.startCommit();
-      writeStatuses = client.delete(deleteRecords, commit3);
-      client.commit(commit3, writeStatuses);
-
-      // insert the same 100 records again
-      String commit4 = client.startCommit();
-      writeStatuses = client.upsert(writeRecords, commit4);
-      client.commit(commit4, writeStatuses);
-      assertEquals(100, readTableTotalRecordsNum());
-    } finally {
-      
jsc().hadoopConfiguration().set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
 "true");
-    }
+    Properties props = getPropertiesForKeyGen(true);
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .forTable("test-trip-table")
+        .withPath(basePath())
+        .withSchema(TRIP_EXAMPLE_SCHEMA)
+        .withParallelism(2, 2)
+          .withEmbeddedTimelineServerEnabled(enableTimelineServer)
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+        .withLayoutConfig(HoodieLayoutConfig.newBuilder()
+            .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
+            
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
+        
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
+        .withMarkersTimelineServerBasedBatchIntervalMs(10)
+        .build();
+    props.putAll(config.getProps());
+
+    metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
+    client = getHoodieWriteClient(config);
+
+    final List<HoodieRecord> records = dataGen.generateInserts("001", 100);
+    JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 2);
+
+    // initialize 100 records
+    String commit1 = client.startCommit();
+    JavaRDD writeStatuses = client.upsert(writeRecords, commit1);
+    client.commit(commit1, writeStatuses);
+
+    // update 100 records
+    String commit2 = client.startCommit();
+    writeStatuses = client.upsert(writeRecords, commit2);
+    client.commit(commit2, writeStatuses);
+    // schedule compaction
+    client.scheduleCompaction(Option.empty());
+    // delete 50 records
+    List<HoodieKey> toBeDeleted = 
records.stream().map(HoodieRecord::getKey).limit(50).collect(Collectors.toList());
+    JavaRDD<HoodieKey> deleteRecords = jsc().parallelize(toBeDeleted, 2);
+    String commit3 = client.startCommit();
+    writeStatuses = client.delete(deleteRecords, commit3);
+    client.commit(commit3, writeStatuses);
+
+    // insert the same 100 records again
+    String commit4 = client.startCommit();
+    writeStatuses = client.upsert(writeRecords, commit4);
+    client.commit(commit4, writeStatuses);
+    assertEquals(100, readTableTotalRecordsNum());
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
index feccc6b4377b..987120e45ea9 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
@@ -42,7 +42,7 @@ import org.apache.spark.SparkConf
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.functions.{col, explode}
 import org.junit.jupiter.api.{Disabled, Tag, Test}
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.CsvSource
 
@@ -121,7 +121,7 @@ class TestMetadataTableWithSparkDataSource extends 
SparkClientFunctionalTestHarn
     // Files partition of MT
     val filesPartitionDF = 
spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/files")
     // Smoke test
-    filesPartitionDF.show()
+    assertEquals(4, filesPartitionDF.collect().length)
 
     // Query w/ 0 requested columns should be working fine
     assertEquals(4, filesPartitionDF.count())
@@ -138,7 +138,7 @@ class TestMetadataTableWithSparkDataSource extends 
SparkClientFunctionalTestHarn
     // Column Stats Index partition of MT
     val colStatsDF = 
spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/column_stats")
     // Smoke test
-    colStatsDF.show()
+    assertDoesNotThrow(() => colStatsDF.collect())
 
     // lets pick one data file and validate col stats
     val partitionPathToTest = "2015/03/16"
@@ -175,7 +175,7 @@ class TestMetadataTableWithSparkDataSource extends 
SparkClientFunctionalTestHarn
     // Files partition of MT
     val filesPartitionDF = 
spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/files")
     // Smoke test
-    filesPartitionDF.show()
+    assertEquals(2, filesPartitionDF.collect().length)
 
     // Query w/ 0 requested columns should be working fine
     assertEquals(2, filesPartitionDF.count())
@@ -192,7 +192,7 @@ class TestMetadataTableWithSparkDataSource extends 
SparkClientFunctionalTestHarn
     // Column Stats Index partition of MT
     val colStatsDF = 
spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/column_stats")
     // Smoke test
-    colStatsDF.show()
+    assertDoesNotThrow(() => colStatsDF.collect())
 
     // lets pick one data file and validate col stats
     val partitionPathToTest = ""
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index 76685bf7f42b..475a5bf55d23 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -204,8 +204,9 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase 
with SparkDatasetMix
     val holder = new testRecordLevelIndexHolder
     testRecordLevelIndex(testCase.tableType, testCase.streamingWriteEnabled, 
holder)
     val writeConfig = getWriteConfig(holder.options)
-    new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), writeConfig)
-      
.rollback(metaClient.getActiveTimeline.lastInstant().get().requestedTime())
+    val writeClient = new SparkRDDWriteClient(new 
HoodieSparkEngineContext(jsc), writeConfig)
+    
writeClient.rollback(metaClient.getActiveTimeline.lastInstant().get().requestedTime())
+    writeClient.close()
     val metadata = metadataWriter(writeConfig).getTableMetadata
     try {
       val partition0Locations = readRecordIndex(metadata, 
holder.bulkRecordKeys, HOption.of("partition0"))
@@ -232,6 +233,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase 
with SparkDatasetMix
     assertEquals("compaction", 
metaClient.getActiveTimeline.lastInstant().get().getAction)
     metadata = metadataWriter(writeConfig).getTableMetadata
     doAllAssertions(holder, metadata)
+    writeClient.close()
   }
 
   @ParameterizedTest
@@ -252,10 +254,11 @@ class TestRecordLevelIndex extends 
RecordLevelIndexTestBase with SparkDatasetMix
     assertEquals("replacecommit", 
metaClient.getActiveTimeline.lastInstant().get().getAction)
     metadata = metadataWriter(writeConfig).getTableMetadata
     doAllAssertions(holder, metadata)
+    writeClient.close()
   }
 
   private def validateDFWithLocations(df: Array[Row], locations: Map[String, 
HoodieRecordGlobalLocation],
-                                       partition: String): Unit = {
+                                      partition: String): Unit = {
     var count: Int = 0
     for (row <- df) {
       val recordKey = row.getString(2)
@@ -391,8 +394,9 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase 
with SparkDatasetMix
           .mapAsJavaMapConverter(options ++ 
Map(HoodieWriteConfig.AVRO_SCHEMA_STRING.key() -> 
latestTableSchemaFromCommitMetadata.get().toString)).asJava))
         .withPath(basePath)
         .build()
-      new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), writeConfig)
-        .rollback(lastInstant.requestedTime())
+      val writeClient = new SparkRDDWriteClient(new 
HoodieSparkEngineContext(jsc), writeConfig)
+      writeClient.rollback(lastInstant.requestedTime())
+      writeClient.close()
     }
 
     if (compact) {
@@ -406,6 +410,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase 
with SparkDatasetMix
       writeClient.compact(timeOpt.get())
       metaClient.reloadActiveTimeline()
       assertEquals("compaction", 
metaClient.getActiveTimeline.lastInstant().get().getAction)
+      writeClient.close()
     }
 
     if (cluster) {
@@ -418,6 +423,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase 
with SparkDatasetMix
       writeClient.cluster(timeOpt.get())
       metaClient.reloadActiveTimeline()
       assertEquals("replacecommit", 
metaClient.getActiveTimeline.lastInstant().get().getAction)
+      writeClient.close()
     }
 
     //init mdt

Reply via email to