This is an automated email from the ASF dual-hosted git repository.
vhs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 43569426895d chore: Test Runtime Improvements: lower number of files,
parallelize reads (#17671)
43569426895d is described below
commit 43569426895d68f9e411e542f3c535d2dca00236
Author: Tim Brown <[email protected]>
AuthorDate: Wed Dec 24 01:27:54 2025 -0500
chore: Test Runtime Improvements: lower number of files, parallelize reads
(#17671)
* lower number of files, parallelize reads
* avoid corrupt file logs for col stats
* close clients, cleanup signature, use simple index by default
* Revert "close clients, cleanup signature, use simple index by default"
This reverts commit 678f3ba5e495ef34598ebe78f4046de277337e53.
* close client, avoid show
* avoid some noisy logs
* use simply index
* further reduce sleep
---
.../hudi/testutils/HoodieMergeOnReadTestUtils.java | 52 ++++++-----
.../hudi/utils/HoodieWriterClientTestHarness.java | 6 +-
.../testutils/HoodieSparkClientTestHarness.java | 6 --
.../hudi/client/TestHoodieClientMultiWriter.java | 2 +-
.../hudi/functional/TestConsistentBucketIndex.java | 2 +-
.../hudi/functional/TestHoodieBackedMetadata.java | 9 +-
.../hudi/functional/TestHoodieFileSystemViews.java | 4 +
.../TestHoodieSparkMergeOnReadTableCompaction.java | 103 ++++++++++-----------
.../TestMetadataTableWithSparkDataSource.scala | 10 +-
.../hudi/functional/TestRecordLevelIndex.scala | 16 +++-
10 files changed, 108 insertions(+), 102 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
index cd90c8d2613a..9b2cb5a64fcc 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
@@ -43,7 +43,9 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
@@ -65,8 +67,7 @@ public class HoodieMergeOnReadTestUtils {
public static List<GenericRecord>
getRecordsUsingInputFormat(StorageConfiguration<?> conf, List<String>
inputPaths,
String
basePath, JobConf jobConf, boolean realtime, boolean populateMetaFields) {
- Schema schema = new
Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
- return getRecordsUsingInputFormat(conf, inputPaths, basePath, jobConf,
realtime, schema,
+ return getRecordsUsingInputFormat(conf, inputPaths, basePath, jobConf,
realtime, HoodieTestDataGenerator.AVRO_SCHEMA,
HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES, false, new
ArrayList<>(), populateMetaFields);
}
@@ -104,33 +105,36 @@ public class HoodieMergeOnReadTestUtils {
.map(HoodieAvroUtils::createNewSchemaField)
.collect(Collectors.toList()));
- List<GenericRecord> records = new ArrayList<>();
try {
FileInputFormat.setInputPaths(jobConf, String.join(",", inputPaths));
InputSplit[] splits = inputFormat.getSplits(jobConf, inputPaths.size());
-
- for (InputSplit split : splits) {
- RecordReader recordReader = inputFormat.getRecordReader(split,
jobConf, null);
- Object key = recordReader.createKey();
- ArrayWritable writable = (ArrayWritable) recordReader.createValue();
- while (recordReader.next(key, writable)) {
- GenericRecordBuilder newRecord = new
GenericRecordBuilder(projectedSchema);
- // writable returns an array with [field1, field2,
_hoodie_commit_time,
- // _hoodie_commit_seqno]
- Writable[] values = writable.get();
- schema.getFields().stream()
- .filter(f -> !projectCols || projectedColumns.contains(f.name()))
- .map(f -> Pair.of(projectedSchema.getFields().stream()
- .filter(p -> f.name().equals(p.name())).findFirst().get(),
f))
- .forEach(fieldsPair -> newRecord.set(fieldsPair.getKey(),
values[fieldsPair.getValue().pos()]));
- records.add(newRecord.build());
+ return Arrays.stream(splits).parallel().flatMap(split -> {
+ List<GenericRecord> records = new ArrayList<>();
+ try {
+ RecordReader recordReader = inputFormat.getRecordReader(split,
jobConf, null);
+ Object key = recordReader.createKey();
+ ArrayWritable writable = (ArrayWritable) recordReader.createValue();
+ while (recordReader.next(key, writable)) {
+ GenericRecordBuilder newRecord = new
GenericRecordBuilder(projectedSchema);
+ // writable returns an array with [field1, field2,
_hoodie_commit_time,
+ // _hoodie_commit_seqno]
+ Writable[] values = writable.get();
+ schema.getFields().stream()
+ .filter(f -> !projectCols ||
projectedColumns.contains(f.name()))
+ .map(f -> Pair.of(projectedSchema.getFields().stream()
+ .filter(p -> f.name().equals(p.name())).findFirst().get(),
f))
+ .forEach(fieldsPair -> newRecord.set(fieldsPair.getKey(),
values[fieldsPair.getValue().pos()]));
+ records.add(newRecord.build());
+ }
+ recordReader.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
}
- recordReader.close();
- }
- } catch (IOException ie) {
- log.error("Read records error", ie);
+ return records.stream();
+ }).collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
}
- return records;
}
private static void setPropsForInputFormat(FileInputFormat inputFormat,
JobConf jobConf, Schema schema, String hiveColumnTypes, boolean projectCols,
List<String> projectedCols,
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
index 82320cd5bf41..56732b995ec5 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
@@ -237,7 +237,7 @@ public abstract class HoodieWriterClientTestHarness extends
HoodieCommonTestHarn
* @return Config Builder
*/
public HoodieWriteConfig.Builder
getConfigBuilder(HoodieFailedWritesCleaningPolicy cleaningPolicy) {
- return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA,
HoodieIndex.IndexType.BLOOM, cleaningPolicy);
+ return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA,
HoodieIndex.IndexType.SIMPLE, cleaningPolicy);
}
/**
@@ -250,7 +250,7 @@ public abstract class HoodieWriterClientTestHarness extends
HoodieCommonTestHarn
}
public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
- return getConfigBuilder(schemaStr, HoodieIndex.IndexType.BLOOM,
HoodieFailedWritesCleaningPolicy.EAGER);
+ return getConfigBuilder(schemaStr, HoodieIndex.IndexType.SIMPLE,
HoodieFailedWritesCleaningPolicy.EAGER);
}
public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr,
HoodieIndex.IndexType indexType) {
@@ -535,7 +535,7 @@ public abstract class HoodieWriterClientTestHarness extends
HoodieCommonTestHarn
}
protected HoodieWriteConfig getSmallInsertWriteConfigForMDT(int
insertSplitSize, String schemaStr, long smallFileSize, boolean
mergeAllowDuplicateInserts) {
- HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr,
HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER);
+ HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr,
HoodieIndex.IndexType.SIMPLE, HoodieFailedWritesCleaningPolicy.EAGER);
return builder.withCompactionConfig(
HoodieCompactionConfig.newBuilder()
.compactionSmallFileSize(smallFileSize)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
index 1b945b4e837c..97bd0e1175d7 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
@@ -621,12 +621,6 @@ public abstract class HoodieSparkClientTestHarness extends
HoodieWriterClientTes
tableView.getAllFileGroups(partition).collect(Collectors.toList());
fileGroups.addAll(tableView.getAllReplacedFileGroups(partition).collect(Collectors.toList()));
- fileGroups.forEach(g -> log.info(g.toString()));
- fileGroups.forEach(g -> g.getAllBaseFiles()
- .forEach(b -> log.info(b.toString())));
- fileGroups.forEach(g -> g.getAllFileSlices()
- .forEach(s -> log.info(s.toString())));
-
long numFiles = fileGroups.stream()
.mapToLong(g -> g.getAllBaseFiles().count()
+ g.getAllFileSlices().mapToLong(s ->
s.getLogFiles().count()).sum())
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
index 4bf1c2de2860..a4e051d6f23e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
@@ -457,7 +457,7 @@ public class TestHoodieClientMultiWriter extends
HoodieClientTestBase {
setUpMORTestTable();
}
- int heartBeatIntervalForCommit4 = 10 * 1000;
+ int heartBeatIntervalForCommit4 = 3 * 1000;
HoodieWriteConfig writeConfig;
TestingServer server = null;
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestConsistentBucketIndex.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestConsistentBucketIndex.java
index d44eca9a969b..560e46cee330 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestConsistentBucketIndex.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestConsistentBucketIndex.java
@@ -113,7 +113,7 @@ public class TestConsistentBucketIndex extends
HoodieSparkClientTestHarness {
.withIndexType(HoodieIndex.IndexType.BUCKET)
.withIndexKeyField("_row_key")
.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING)
- .withBucketNum("8")
+ .withBucketNum("4")
.build())
.build();
writeClient = getHoodieWriteClient(config);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
index 8d51c21fbcf7..517ac117e51b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
@@ -251,7 +251,12 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
// bootstrap with few commits
doPreBootstrapOperations(testTable);
- writeConfig = getWriteConfig(true, true);
+ writeConfig = getWriteConfigBuilder(true, true, false)
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .enable(true)
+ .withMetadataIndexColumnStats(false) // No real files written so
avoid stats
+ .build())
+ .build();
initWriteConfigAndMetatableWriter(writeConfig, true);
syncTableMetadata(writeConfig);
validateMetadata(testTable);
@@ -576,6 +581,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(true)
.enableMetrics(false)
+ .withMetadataIndexColumnStats(false)
.withMaxNumDeltaCommitsBeforeCompaction(1)
.build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
@@ -3111,6 +3117,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
.enableMetrics(false)
.withMaxNumDeltaCommitsBeforeCompaction(maxNumDeltaCommits
- 1)
.withMaxNumDeltacommitsWhenPending(maxNumDeltaCommits)
+ .withMetadataIndexColumnStats(false) // no real files so
avoid reading column stats
.build())
.build();
initWriteConfigAndMetatableWriter(writeConfig, true);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieFileSystemViews.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieFileSystemViews.java
index 6807f6cbcbb2..387a87b902fa 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieFileSystemViews.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieFileSystemViews.java
@@ -91,6 +91,10 @@ public class TestHoodieFileSystemViews extends
HoodieClientTestBase {
for (HoodieTableType tableType : HoodieTableType.values()) {
for (boolean enableMdt : Arrays.asList(true, false)) {
for (FileSystemViewStorageType viewStorageType :
Arrays.asList(FileSystemViewStorageType.MEMORY,
FileSystemViewStorageType.SPILLABLE_DISK)) {
+ if (!enableMdt && viewStorageType ==
FileSystemViewStorageType.MEMORY) {
+ // This is the baseline case, no need to test here.
+ continue;
+ }
for (int writerVersion : Arrays.asList(6, 8)) {
testCases.add(Arguments.of(tableType, enableMdt, viewStorageType,
writerVersion));
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
index f9f999ee3118..960b98eb73e2 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
@@ -26,7 +26,6 @@ import org.apache.hudi.client.WriteClientTestUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
@@ -141,8 +140,6 @@ public class TestHoodieSparkMergeOnReadTableCompaction
extends SparkClientFuncti
.withMaxNumDeltaCommitsBeforeCompaction(1)
.compactionSmallFileSize(0)
.build())
- .withStorageConfig(HoodieStorageConfig.newBuilder()
- .parquetMaxFileSize(1024).build())
.withLayoutConfig(layoutConfig)
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(indexType).withBucketNum("1").build())
.withMarkersTimelineServerBasedBatchIntervalMs(10)
@@ -183,59 +180,53 @@ public class TestHoodieSparkMergeOnReadTableCompaction
extends SparkClientFuncti
@ParameterizedTest
@MethodSource("writeLogTest")
public void testWriteLogDuringCompaction(boolean enableMetadataTable,
boolean enableTimelineServer) throws IOException {
- try {
- //disable for this test because it seems like we process mor in a
different order?
-
jsc().hadoopConfiguration().set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
"false");
- Properties props = getPropertiesForKeyGen(true);
- HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
- .forTable("test-trip-table")
- .withPath(basePath())
- .withSchema(TRIP_EXAMPLE_SCHEMA)
- .withParallelism(2, 2)
- .withEmbeddedTimelineServerEnabled(enableTimelineServer)
-
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .withMaxNumDeltaCommitsBeforeCompaction(1).build())
- .withLayoutConfig(HoodieLayoutConfig.newBuilder()
- .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
-
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
-
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
- .withMarkersTimelineServerBasedBatchIntervalMs(10)
- .build();
- props.putAll(config.getProps());
-
- metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
- client = getHoodieWriteClient(config);
-
- final List<HoodieRecord> records = dataGen.generateInserts("001", 100);
- JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 2);
-
- // initialize 100 records
- String commit1 = client.startCommit();
- JavaRDD writeStatuses = client.upsert(writeRecords, commit1);
- client.commit(commit1, writeStatuses);
-
- // update 100 records
- String commit2 = client.startCommit();
- writeStatuses = client.upsert(writeRecords, commit2);
- client.commit(commit2, writeStatuses);
- // schedule compaction
- client.scheduleCompaction(Option.empty());
- // delete 50 records
- List<HoodieKey> toBeDeleted =
records.stream().map(HoodieRecord::getKey).limit(50).collect(Collectors.toList());
- JavaRDD<HoodieKey> deleteRecords = jsc().parallelize(toBeDeleted, 2);
- String commit3 = client.startCommit();
- writeStatuses = client.delete(deleteRecords, commit3);
- client.commit(commit3, writeStatuses);
-
- // insert the same 100 records again
- String commit4 = client.startCommit();
- writeStatuses = client.upsert(writeRecords, commit4);
- client.commit(commit4, writeStatuses);
- assertEquals(100, readTableTotalRecordsNum());
- } finally {
-
jsc().hadoopConfiguration().set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
"true");
- }
+ Properties props = getPropertiesForKeyGen(true);
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .forTable("test-trip-table")
+ .withPath(basePath())
+ .withSchema(TRIP_EXAMPLE_SCHEMA)
+ .withParallelism(2, 2)
+ .withEmbeddedTimelineServerEnabled(enableTimelineServer)
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .withLayoutConfig(HoodieLayoutConfig.newBuilder()
+ .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
+
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
+
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
+ .withMarkersTimelineServerBasedBatchIntervalMs(10)
+ .build();
+ props.putAll(config.getProps());
+
+ metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
+ client = getHoodieWriteClient(config);
+
+ final List<HoodieRecord> records = dataGen.generateInserts("001", 100);
+ JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 2);
+
+ // initialize 100 records
+ String commit1 = client.startCommit();
+ JavaRDD writeStatuses = client.upsert(writeRecords, commit1);
+ client.commit(commit1, writeStatuses);
+
+ // update 100 records
+ String commit2 = client.startCommit();
+ writeStatuses = client.upsert(writeRecords, commit2);
+ client.commit(commit2, writeStatuses);
+ // schedule compaction
+ client.scheduleCompaction(Option.empty());
+ // delete 50 records
+ List<HoodieKey> toBeDeleted =
records.stream().map(HoodieRecord::getKey).limit(50).collect(Collectors.toList());
+ JavaRDD<HoodieKey> deleteRecords = jsc().parallelize(toBeDeleted, 2);
+ String commit3 = client.startCommit();
+ writeStatuses = client.delete(deleteRecords, commit3);
+ client.commit(commit3, writeStatuses);
+
+ // insert the same 100 records again
+ String commit4 = client.startCommit();
+ writeStatuses = client.upsert(writeRecords, commit4);
+ client.commit(commit4, writeStatuses);
+ assertEquals(100, readTableTotalRecordsNum());
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
index feccc6b4377b..987120e45ea9 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
@@ -42,7 +42,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.{col, explode}
import org.junit.jupiter.api.{Disabled, Tag, Test}
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
@@ -121,7 +121,7 @@ class TestMetadataTableWithSparkDataSource extends
SparkClientFunctionalTestHarn
// Files partition of MT
val filesPartitionDF =
spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/files")
// Smoke test
- filesPartitionDF.show()
+ assertEquals(4, filesPartitionDF.collect().length)
// Query w/ 0 requested columns should be working fine
assertEquals(4, filesPartitionDF.count())
@@ -138,7 +138,7 @@ class TestMetadataTableWithSparkDataSource extends
SparkClientFunctionalTestHarn
// Column Stats Index partition of MT
val colStatsDF =
spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/column_stats")
// Smoke test
- colStatsDF.show()
+ assertDoesNotThrow(() => colStatsDF.collect())
// lets pick one data file and validate col stats
val partitionPathToTest = "2015/03/16"
@@ -175,7 +175,7 @@ class TestMetadataTableWithSparkDataSource extends
SparkClientFunctionalTestHarn
// Files partition of MT
val filesPartitionDF =
spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/files")
// Smoke test
- filesPartitionDF.show()
+ assertEquals(2, filesPartitionDF.collect().length)
// Query w/ 0 requested columns should be working fine
assertEquals(2, filesPartitionDF.count())
@@ -192,7 +192,7 @@ class TestMetadataTableWithSparkDataSource extends
SparkClientFunctionalTestHarn
// Column Stats Index partition of MT
val colStatsDF =
spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/column_stats")
// Smoke test
- colStatsDF.show()
+ assertDoesNotThrow(() => colStatsDF.collect())
// lets pick one data file and validate col stats
val partitionPathToTest = ""
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index 76685bf7f42b..475a5bf55d23 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -204,8 +204,9 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase
with SparkDatasetMix
val holder = new testRecordLevelIndexHolder
testRecordLevelIndex(testCase.tableType, testCase.streamingWriteEnabled,
holder)
val writeConfig = getWriteConfig(holder.options)
- new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), writeConfig)
-
.rollback(metaClient.getActiveTimeline.lastInstant().get().requestedTime())
+ val writeClient = new SparkRDDWriteClient(new
HoodieSparkEngineContext(jsc), writeConfig)
+
writeClient.rollback(metaClient.getActiveTimeline.lastInstant().get().requestedTime())
+ writeClient.close()
val metadata = metadataWriter(writeConfig).getTableMetadata
try {
val partition0Locations = readRecordIndex(metadata,
holder.bulkRecordKeys, HOption.of("partition0"))
@@ -232,6 +233,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase
with SparkDatasetMix
assertEquals("compaction",
metaClient.getActiveTimeline.lastInstant().get().getAction)
metadata = metadataWriter(writeConfig).getTableMetadata
doAllAssertions(holder, metadata)
+ writeClient.close()
}
@ParameterizedTest
@@ -252,10 +254,11 @@ class TestRecordLevelIndex extends
RecordLevelIndexTestBase with SparkDatasetMix
assertEquals("replacecommit",
metaClient.getActiveTimeline.lastInstant().get().getAction)
metadata = metadataWriter(writeConfig).getTableMetadata
doAllAssertions(holder, metadata)
+ writeClient.close()
}
private def validateDFWithLocations(df: Array[Row], locations: Map[String,
HoodieRecordGlobalLocation],
- partition: String): Unit = {
+ partition: String): Unit = {
var count: Int = 0
for (row <- df) {
val recordKey = row.getString(2)
@@ -391,8 +394,9 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase
with SparkDatasetMix
.mapAsJavaMapConverter(options ++
Map(HoodieWriteConfig.AVRO_SCHEMA_STRING.key() ->
latestTableSchemaFromCommitMetadata.get().toString)).asJava))
.withPath(basePath)
.build()
- new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), writeConfig)
- .rollback(lastInstant.requestedTime())
+ val writeClient = new SparkRDDWriteClient(new
HoodieSparkEngineContext(jsc), writeConfig)
+ writeClient.rollback(lastInstant.requestedTime())
+ writeClient.close()
}
if (compact) {
@@ -406,6 +410,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase
with SparkDatasetMix
writeClient.compact(timeOpt.get())
metaClient.reloadActiveTimeline()
assertEquals("compaction",
metaClient.getActiveTimeline.lastInstant().get().getAction)
+ writeClient.close()
}
if (cluster) {
@@ -418,6 +423,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase
with SparkDatasetMix
writeClient.cluster(timeOpt.get())
metaClient.reloadActiveTimeline()
assertEquals("replacecommit",
metaClient.getActiveTimeline.lastInstant().get().getAction)
+ writeClient.close()
}
//init mdt