This is an automated email from the ASF dual-hosted git repository.
vhs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 93932fc40de6 refactor: Add Lombok annotations to
hudi-spark,hudi-spark-common (#17718)
93932fc40de6 is described below
commit 93932fc40de69f8577b33b10381ace24cd9395c7
Author: voonhous <[email protected]>
AuthorDate: Sat Dec 27 14:26:21 2025 +0800
refactor: Add Lombok annotations to hudi-spark,hudi-spark-common (#17718)
* refactor: Add Lombok annotations to hudi-spark and hudi-spark-common
* Address non-static logging vars
---
hudi-spark-datasource/hudi-spark-common/pom.xml | 6 ++
.../main/java/org/apache/hudi/DataSourceUtils.java | 20 +++---
.../BaseDatasetBulkInsertCommitActionExecutor.java | 6 +-
.../DatasetBucketRescaleCommitActionExecutor.java | 9 ++-
.../hudi/internal/BaseWriterCommitMessage.java | 21 ++----
.../internal/DataSourceInternalWriterHelper.java | 32 ++++-----
hudi-spark-datasource/hudi-spark/pom.xml | 6 ++
.../main/java/org/apache/hudi/QuickstartUtils.java | 6 +-
.../org/apache/hudi/cli/ArchiveExecutorUtils.java | 13 ++--
.../apache/hudi/cli/BootstrapExecutorUtils.java | 19 +++--
.../apache/hudi/cli/HDFSParquetImporterUtils.java | 23 +++----
.../hudi-spark/src/test/java/HoodieJavaApp.java | 16 ++---
.../src/test/java/HoodieJavaGenerateApp.java | 10 ++-
.../src/test/java/HoodieJavaStreamingApp.java | 43 ++++++------
.../hudi/client/TestTableSchemaEvolution.java | 8 +--
.../TestRemoteFileSystemViewWithMetadataTable.java | 17 +++--
.../TestColStatsRecordWithMetadataRecord.java | 8 +--
.../hudi/functional/TestHoodieBackedMetadata.java | 21 +++---
.../TestHoodieDatasetBulkInsertHelper.java | 10 ++-
.../hudi/functional/TestHoodieFileSystemViews.java | 7 +-
.../java/org/apache/hudi/io/CustomPayload.java | 15 ++--
.../java/org/apache/hudi/io/TestMergeHandle.java | 45 ++----------
.../hudi/table/upgrade/TestUpgradeDowngrade.java | 80 +++++++++++-----------
23 files changed, 189 insertions(+), 252 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark-common/pom.xml
b/hudi-spark-datasource/hudi-spark-common/pom.xml
index e0a555d60982..d60973b7bc11 100644
--- a/hudi-spark-datasource/hudi-spark-common/pom.xml
+++ b/hudi-spark-datasource/hudi-spark-common/pom.xml
@@ -132,6 +132,12 @@
<artifactId>log4j-1.2-api</artifactId>
</dependency>
+ <!-- Lombok -->
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
+
<!-- Hoodie -->
<dependency>
<groupId>org.apache.hudi</groupId>
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index fe05c4281761..fd854f8127f4 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -49,12 +49,11 @@ import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.BulkInsertPartitioner;
+import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
@@ -70,13 +69,12 @@ import static
org.apache.hudi.common.util.CommitUtils.getCheckpointValueAsString
/**
* Utilities used throughout the data source.
*/
+@Slf4j
public class DataSourceUtils {
- private static final Logger LOG =
LoggerFactory.getLogger(DataSourceUtils.class);
-
public static String getTablePath(HoodieStorage storage,
List<StoragePath> userProvidedPaths)
throws IOException {
- LOG.info("Getting table path..");
+ log.info("Getting table path..");
for (StoragePath path : userProvidedPaths) {
try {
Option<StoragePath> tablePath = TablePathUtils.getTablePath(storage,
path);
@@ -84,7 +82,7 @@ public class DataSourceUtils {
return tablePath.get().toString();
}
} catch (HoodieException he) {
- LOG.warn("Error trying to get table path from {}", path.toString(),
he);
+ log.warn("Error trying to get table path from {}", path.toString(),
he);
}
}
@@ -335,17 +333,17 @@ public class DataSourceUtils {
totalErroredRecords,
errorCount);
- LOG.error(errorSummary);
+ log.error(errorSummary);
- if (LOG.isTraceEnabled()) {
- LOG.trace("Printing out the top 100 errors");
+ if (log.isTraceEnabled()) {
+ log.trace("Printing out the top 100 errors");
HoodieJavaRDD.getJavaRDD(writeStatusesOpt.get()).filter(WriteStatus::hasErrors)
.take(100)
.forEach(ws -> {
- LOG.trace("Global error:", ws.getGlobalError());
+ log.trace("Global error:", ws.getGlobalError());
if (!ws.getErrors().isEmpty()) {
- ws.getErrors().forEach((k, v) -> LOG.trace("Error for key
{}: {}", k, v));
+ ws.getErrors().forEach((k, v) -> log.trace("Error for key
{}: {}", k, v));
}
});
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
index 194c837f3d7c..75b5e6637f9a 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
@@ -42,6 +42,7 @@ import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
+import lombok.Getter;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -57,6 +58,7 @@ public abstract class
BaseDatasetBulkInsertCommitActionExecutor implements Seria
protected final transient HoodieWriteConfig writeConfig;
protected final transient SparkRDDWriteClient writeClient;
+ @Getter
protected String instantTime;
protected HoodieTable table;
@@ -139,8 +141,4 @@ public abstract class
BaseDatasetBulkInsertCommitActionExecutor implements Seria
}
protected abstract Map<String, List<String>>
getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses);
-
- public String getInstantTime() {
- return instantTime;
- }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
index c18bf38a8ba7..9f9e9ae9288f 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
@@ -28,19 +28,18 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaPairRDD;
import
org.apache.hudi.execution.bulkinsert.BucketIndexBulkInsertPartitionerWithRows;
import org.apache.hudi.table.BulkInsertPartitioner;
+
+import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
+@Slf4j
public class DatasetBucketRescaleCommitActionExecutor extends
DatasetBulkInsertOverwriteCommitActionExecutor {
private static final long serialVersionUID = 1L;
-
- private static final Logger LOG =
LoggerFactory.getLogger(DatasetBucketRescaleCommitActionExecutor.class);
private final String expression;
private final String rule;
private final int bucketNumber;
@@ -71,7 +70,7 @@ public class DatasetBucketRescaleCommitActionExecutor extends
DatasetBulkInsertO
bucketNumber, rule, PartitionBucketIndexHashingConfig.CURRENT_VERSION,
instantTime);
boolean res =
PartitionBucketIndexHashingConfig.saveHashingConfig(hashingConfig,
table.getMetaClient());
ValidationUtils.checkArgument(res);
- LOG.info("Finish to save hashing config {}", hashingConfig);
+ log.info("Finish to save hashing config {}", hashingConfig);
}
@Override
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java
index d8e0e5372e70..1ec84269b6b7 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java
@@ -20,27 +20,20 @@ package org.apache.hudi.internal;
import org.apache.hudi.client.WriteStatus;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.ToString;
+
import java.io.Serializable;
-import java.util.Arrays;
import java.util.List;
/**
* Base class for HoodieWriterCommitMessage used by Spark datasource v2.
*/
+@AllArgsConstructor
+@Getter
+@ToString
public class BaseWriterCommitMessage implements Serializable {
private final List<WriteStatus> writeStatuses;
-
- public BaseWriterCommitMessage(List<WriteStatus> writeStatuses) {
- this.writeStatuses = writeStatuses;
- }
-
- public List<WriteStatus> getWriteStatuses() {
- return writeStatuses;
- }
-
- @Override
- public String toString() {
- return "HoodieWriterCommitMessage{" + "writeStatuses=" +
Arrays.toString(writeStatuses.toArray()) + '}';
- }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
index ea30c4b95c5f..902235a2acbd 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
@@ -33,11 +33,11 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.table.HoodieTable;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
@@ -47,26 +47,28 @@ import java.util.stream.Collectors;
/**
* Helper class for HoodieDataSourceInternalWriter used by Spark datasource v2.
*/
+@Slf4j
public class DataSourceInternalWriterHelper {
- private static final Logger LOG =
LoggerFactory.getLogger(DataSourceInternalWriterHelper.class);
public static final String INSTANT_TIME_OPT_KEY = "hoodie.instant.time";
private final String instantTime;
private final HoodieTableMetaClient metaClient;
private final SparkRDDWriteClient writeClient;
+ @Getter
private final HoodieTable hoodieTable;
- private final WriteOperationType operationType;
+ @Getter
+ private final WriteOperationType writeOperationType;
private final Map<String, String> extraMetadata;
public DataSourceInternalWriterHelper(String instantTime, HoodieWriteConfig
writeConfig, StructType structType,
SparkSession sparkSession,
StorageConfiguration<?> storageConf, Map<String, String> extraMetadata) {
this.instantTime = instantTime;
- this.operationType = WriteOperationType.BULK_INSERT;
+ this.writeOperationType = WriteOperationType.BULK_INSERT;
this.extraMetadata = extraMetadata;
this.writeClient = new SparkRDDWriteClient<>(new
HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())),
writeConfig);
- this.writeClient.setOperationType(operationType);
- this.hoodieTable = this.writeClient.initTable(operationType,
Option.of(instantTime));
+ this.writeClient.setOperationType(writeOperationType);
+ this.hoodieTable = this.writeClient.initTable(writeOperationType,
Option.of(instantTime));
this.metaClient = HoodieTableMetaClient.builder()
.setConf(storageConf.newInstance()).setBasePath(writeConfig.getBasePath()).build();
@@ -79,14 +81,14 @@ public class DataSourceInternalWriterHelper {
}
public void onDataWriterCommit(String message) {
- LOG.info("Received commit of a data writer = {}", message);
+ log.info("Received commit of a data writer = {}", message);
}
public void commit(List<WriteStatus> writeStatuses) {
try {
List<HoodieWriteStat> writeStatList =
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
writeClient.commitStats(instantTime, new TableWriteStats(writeStatList),
Option.of(extraMetadata),
- CommitUtils.getCommitActionType(operationType,
metaClient.getTableType()), Collections.emptyMap(), Option.empty(),
+ CommitUtils.getCommitActionType(writeOperationType,
metaClient.getTableType()), Collections.emptyMap(), Option.empty(),
true, Option.empty());
} catch (Exception ioe) {
throw new HoodieException(ioe.getMessage(), ioe);
@@ -96,22 +98,14 @@ public class DataSourceInternalWriterHelper {
}
public void abort() {
- LOG.error("Commit " + instantTime + " aborted ");
+ log.error("Commit {} aborted ", instantTime);
writeClient.close();
}
public String createInflightCommit() {
metaClient.getActiveTimeline().transitionRequestedToInflight(
metaClient.createNewInstant(State.REQUESTED,
- CommitUtils.getCommitActionType(operationType,
metaClient.getTableType()), instantTime), Option.empty());
+ CommitUtils.getCommitActionType(writeOperationType,
metaClient.getTableType()), instantTime), Option.empty());
return instantTime;
}
-
- public HoodieTable getHoodieTable() {
- return hoodieTable;
- }
-
- public WriteOperationType getWriteOperationType() {
- return operationType;
- }
}
diff --git a/hudi-spark-datasource/hudi-spark/pom.xml
b/hudi-spark-datasource/hudi-spark/pom.xml
index a8672ee1b6bc..f8376eb5ee39 100644
--- a/hudi-spark-datasource/hudi-spark/pom.xml
+++ b/hudi-spark-datasource/hudi-spark/pom.xml
@@ -155,6 +155,12 @@
<artifactId>log4j-1.2-api</artifactId>
</dependency>
+ <!-- Lombok -->
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
+
<!-- Hoodie - Spark-->
<dependency>
<groupId>org.apache.hudi</groupId>
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
index 996bda01deb3..3d1e929d7cac 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
+++
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import lombok.Getter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
@@ -69,6 +70,7 @@ public class QuickstartUtils {
private final Map<Integer, HoodieKey> existingKeys;
private final String[] partitionPaths;
+ @Getter
private int numExistingKeys;
public DataGenerator() {
@@ -96,10 +98,6 @@ public class QuickstartUtils {
return buffer.toString();
}
- public int getNumExistingKeys() {
- return numExistingKeys;
- }
-
public static GenericRecord generateGenericRecord(String rowKey, String
riderName, String driverName,
long timestamp) {
GenericRecord rec = new GenericData.Record(schema.getAvroSchema());
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/ArchiveExecutorUtils.java
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/ArchiveExecutorUtils.java
index da30c4c865bd..772450903e5f 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/ArchiveExecutorUtils.java
+++
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/ArchiveExecutorUtils.java
@@ -34,20 +34,19 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.util.CommonClientUtils;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Archive Utils.
*/
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+@Slf4j
public final class ArchiveExecutorUtils {
- private static final Logger LOG =
LoggerFactory.getLogger(ArchiveExecutorUtils.class);
-
- private ArchiveExecutorUtils() {
- }
public static int archive(JavaSparkContext jsc,
int minCommits,
@@ -69,7 +68,7 @@ public final class ArchiveExecutorUtils {
TimelineArchivers.getInstance(table.getMetaClient().getTimelineLayoutVersion(),
config, table);
archiver.archiveIfRequired(context, true);
} catch (IOException ioe) {
- LOG.error("Failed to archive with IOException: {}", ioe.getMessage());
+ log.error("Failed to archive with IOException: {}", ioe.getMessage());
throw ioe;
}
return 0;
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
index f934f039b231..b4d6c47e8701 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
+++
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
@@ -48,12 +48,11 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.util.SparkKeyGenUtils;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
@@ -82,10 +81,9 @@ import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
* Performs bootstrap from a non-hudi source.
* import static org.apache.hudi.common.util.ConfigUtils.filterProperties;
*/
+@Slf4j
public class BootstrapExecutorUtils implements Serializable {
- private static final Logger LOG =
LoggerFactory.getLogger(BootstrapExecutorUtils.class);
-
/**
* Config.
*/
@@ -159,7 +157,7 @@ public class BootstrapExecutorUtils implements Serializable
{
builder =
builder.withSchema(schemaProvider.getTargetSchema().toString());
}
this.bootstrapConfig = builder.build();
- LOG.info("Created bootstrap executor with configs : " +
bootstrapConfig.getProps());
+ log.info("Created bootstrap executor with configs : {}",
bootstrapConfig.getProps());
}
public static SchemaProvider createSchemaProvider(String
schemaProviderClass, TypedProperties cfg,
@@ -185,7 +183,7 @@ public class BootstrapExecutorUtils implements Serializable
{
} catch (Exception e) {
Path basePath = new Path(cfg.basePath);
if (fs.exists(basePath)) {
- LOG.info("deleted target base path {}", cfg.basePath);
+ log.info("deleted target base path {}", cfg.basePath);
fs.delete(basePath, true);
}
throw new HoodieException("Failed to bootstrap table", e);
@@ -219,7 +217,7 @@ public class BootstrapExecutorUtils implements Serializable
{
Path basePath = new Path(cfg.basePath);
if (fs.exists(basePath)) {
if (cfg.bootstrapOverwrite) {
- LOG.info("Target base path already exists, overwrite it");
+ log.info("Target base path already exists, overwrite it");
fs.delete(basePath, true);
} else {
throw new HoodieException("target base path already exists at " +
cfg.basePath
@@ -302,7 +300,14 @@ public class BootstrapExecutorUtils implements
Serializable {
return Collections.emptyMap();
}
+ /**
+ * Configuration class for Bootstrap operations.
+ * Note: Explicit setters are used instead of Lombok's @Setter annotation
because this class is accessed from Scala code
+ * (RunBootstrapProcedure.scala). Since Scala compilation happens before
Java compilation in the Maven build lifecycle,
+ * Lombok-generated methods would not be visible to the Scala compiler,
causing compilation errors.
+ */
public static class Config {
+
private String database;
private String tableName;
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java
index 8de68192393d..7fc06fc4e380 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java
+++
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java
@@ -43,6 +43,7 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -55,8 +56,6 @@ import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.util.LongAccumulator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
@@ -74,10 +73,10 @@ import static
org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
/**
* Loads data from Parquet Sources.
*/
+@Slf4j
public class HDFSParquetImporterUtils implements Serializable {
private static final long serialVersionUID = 1L;
- private static final Logger LOG =
LoggerFactory.getLogger(HDFSParquetImporterUtils.class);
private static final DateTimeFormatter PARTITION_FORMATTER =
DateTimeFormatter.ofPattern("yyyy/MM/dd")
.withZone(ZoneId.systemDefault());
@@ -128,7 +127,7 @@ public class HDFSParquetImporterUtils implements
Serializable {
FileSystem fs = HadoopFSUtils.getFs(this.targetPath,
jsc.hadoopConfiguration());
this.props = this.propsFilePath == null || this.propsFilePath.isEmpty() ?
buildProperties(this.configs)
: readConfig(fs.getConf(), new StoragePath(this.propsFilePath),
this.configs).getProps(true);
- LOG.info("Starting data import with configs : " + props.toString());
+ log.info("Starting data import with configs : {}", props.toString());
int ret = -1;
try {
// Verify that targetPath is not present.
@@ -139,7 +138,7 @@ public class HDFSParquetImporterUtils implements
Serializable {
ret = dataImport(jsc, fs);
} while (ret != 0 && retry-- > 0);
} catch (Throwable t) {
- LOG.error("dataImport failed", t);
+ log.error("dataImport failed", t);
}
return ret;
}
@@ -171,7 +170,7 @@ public class HDFSParquetImporterUtils implements
Serializable {
JavaRDD<WriteStatus> writeResponse = load(client, instantTime,
hoodieRecords);
return handleErrors(jsc, instantTime, writeResponse);
} catch (Throwable t) {
- LOG.error("Error occurred.", t);
+ log.error("Error occurred.", t);
}
return -1;
}
@@ -202,13 +201,13 @@ public class HDFSParquetImporterUtils implements
Serializable {
throw new HoodieIOException("row field is missing. :" +
this.rowKey);
}
String partitionPath = partitionField.toString();
- LOG.debug("Row Key : {}, Partition Path is ({}})", rowField,
partitionPath);
+ log.debug("Row Key : {}, Partition Path is ({}})", rowField,
partitionPath);
if (partitionField instanceof Number) {
try {
long ts = (long) (Double.parseDouble(partitionField.toString())
* 1000L);
partitionPath =
PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
} catch (NumberFormatException nfe) {
- LOG.warn("Unable to parse date from partition field. Assuming
partition as ({})", partitionField);
+ log.warn("Unable to parse date from partition field. Assuming
partition as ({})", partitionField);
}
}
return new HoodieAvroRecord<>(new HoodieKey(rowField.toString(),
partitionPath),
@@ -259,7 +258,7 @@ public class HDFSParquetImporterUtils implements
Serializable {
DFSPropertiesConfiguration conf = new
DFSPropertiesConfiguration(hadoopConfig, cfgPath);
try {
if (!overriddenProps.isEmpty()) {
- LOG.info("Adding overridden properties to file properties.");
+ log.info("Adding overridden properties to file properties.");
conf.addPropsFromStream(new BufferedReader(new
StringReader(String.join("\n", overriddenProps))), cfgPath);
}
} catch (IOException ioe) {
@@ -320,14 +319,14 @@ public class HDFSParquetImporterUtils implements
Serializable {
writeResponse.foreach(writeStatus -> {
if (writeStatus.hasErrors()) {
errors.add(1);
- LOG.error("Error processing records :writeStatus:{}",
writeStatus.getStat());
+ log.error("Error processing records :writeStatus:{}",
writeStatus.getStat());
}
});
if (errors.value() == 0) {
- LOG.info("Table imported into hoodie with {} instant time.",
instantTime);
+ log.info("Table imported into hoodie with {} instant time.",
instantTime);
return 0;
}
- LOG.error("Import failed with {} errors.", errors.value());
+ log.error("Import failed with {} errors.", errors.value());
return -1;
}
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
index 02a101fe120d..5273527d6797 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
@@ -32,6 +32,7 @@ import
org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
@@ -40,8 +41,6 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
@@ -60,6 +59,7 @@ import static
org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTes
/**
* Sample program that writes & reads hoodie tables via the Spark datasource.
*/
+@Slf4j
public class HoodieJavaApp {
@Parameter(names = {"--table-path", "-p"}, description = "path for Hoodie
sample table")
@@ -98,8 +98,6 @@ public class HoodieJavaApp {
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieJavaApp.class);
-
public static void main(String[] args) throws Exception {
HoodieJavaApp cli = new HoodieJavaApp();
JCommander cmd = new JCommander(cli, null, args);
@@ -170,7 +168,7 @@ public class HoodieJavaApp {
// new dataset if needed
writer.save(tablePath); // ultimately where the dataset will be placed
String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs,
tablePath);
- LOG.info("First commit at instant time :" + commitInstantTime1);
+ log.info("First commit at instant time: {}", commitInstantTime1);
/**
* Commit that updates records
@@ -193,7 +191,7 @@ public class HoodieJavaApp {
updateHiveSyncConfig(writer);
writer.save(tablePath);
String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs,
tablePath);
- LOG.info("Second commit at instant time :" + commitInstantTime2);
+ log.info("Second commit at instant time: {}", commitInstantTime2);
/**
* Commit that Deletes some records
@@ -218,7 +216,7 @@ public class HoodieJavaApp {
updateHiveSyncConfig(writer);
writer.save(tablePath);
String commitInstantTime3 = HoodieDataSourceHelpers.latestCommit(fs,
tablePath);
- LOG.info("Third commit at instant time :" + commitInstantTime3);
+ log.info("Third commit at instant time: {}", commitInstantTime3);
/**
* Read & do some queries
@@ -240,7 +238,7 @@ public class HoodieJavaApp {
// For incremental view, pass in the root/base path of dataset
.load(tablePath);
- LOG.info("You will only see records from : " + commitInstantTime2);
+ log.info("You will only see records from: {}", commitInstantTime2);
incQueryDF.groupBy(incQueryDF.col("_hoodie_commit_time")).count().show();
}
}
@@ -250,7 +248,7 @@ public class HoodieJavaApp {
*/
private DataFrameWriter<Row> updateHiveSyncConfig(DataFrameWriter<Row>
writer) {
if (enableHiveSync) {
- LOG.info("Enabling Hive sync to " + hiveJdbcUrl);
+ log.info("Enabling Hive sync to {}", hiveJdbcUrl);
writer = writer.option(META_SYNC_TABLE_NAME.key(), hiveTable)
.option(META_SYNC_DATABASE_NAME.key(), hiveDB)
.option(HIVE_URL.key(), hiveJdbcUrl)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaGenerateApp.java
b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaGenerateApp.java
index 1451ce469be5..0c2254461325 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaGenerateApp.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaGenerateApp.java
@@ -31,14 +31,13 @@ import
org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -53,6 +52,7 @@ import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NA
import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
import static
org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest;
+@Slf4j
public class HoodieJavaGenerateApp {
@Parameter(names = {"--table-path", "-p"}, description = "Path for Hoodie
sample table")
private String tablePath = "file:///tmp/hoodie/sample-table";
@@ -93,8 +93,6 @@ public class HoodieJavaGenerateApp {
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieJavaGenerateApp.class);
-
public static void main(String[] args) throws Exception {
HoodieJavaGenerateApp cli = new HoodieJavaGenerateApp();
JCommander cmd = new JCommander(cli, null, args);
@@ -132,7 +130,7 @@ public class HoodieJavaGenerateApp {
*/
private DataFrameWriter<Row> updateHiveSyncConfig(DataFrameWriter<Row>
writer) {
if (enableHiveSync) {
- LOG.info("Enabling Hive sync to " + hiveJdbcUrl);
+ log.info("Enabling Hive sync to {}", hiveJdbcUrl);
writer = writer.option(META_SYNC_TABLE_NAME.key(), hiveTable)
.option(META_SYNC_DATABASE_NAME.key(), hiveDB)
.option(HIVE_URL.key(), hiveJdbcUrl)
@@ -194,6 +192,6 @@ public class HoodieJavaGenerateApp {
writer.save(tablePath); // ultimately where the dataset will be placed
FileSystem fs = FileSystem.get(jssc.hadoopConfiguration());
String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs,
tablePath);
- LOG.info("Commit at instant time :" + commitInstantTime1);
+ log.info("Commit at instant time: {}", commitInstantTime1);
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
index 1e5a5e478a70..54f39a02bc10 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
@@ -36,6 +36,7 @@ import org.apache.hudi.testutils.HoodieClientTestUtils;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
@@ -47,8 +48,6 @@ import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -68,6 +67,7 @@ import static
org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTes
/**
* Sample program that writes & reads hoodie tables via the Spark datasource
streaming.
*/
+@Slf4j
public class HoodieJavaStreamingApp {
@Parameter(names = {"--table-path", "-p"}, description = "path for Hoodie
sample table")
@@ -114,9 +114,6 @@ public class HoodieJavaStreamingApp {
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
-
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieJavaStreamingApp.class);
-
public static void main(String[] args) throws Exception {
HoodieJavaStreamingApp cli = new HoodieJavaStreamingApp();
JCommander cmd = new JCommander(cli, null, args);
@@ -129,7 +126,7 @@ public class HoodieJavaStreamingApp {
try {
cli.run();
} catch (Exception ex) {
- LOG.error("Got error running app ", ex);
+ log.error("Got error running app ", ex);
errStatus = -1;
} finally {
System.exit(errStatus);
@@ -179,17 +176,17 @@ public class HoodieJavaStreamingApp {
// thread for spark structured streaming
try {
Future<Void> streamFuture = executor.submit(() -> {
- LOG.info("===== Streaming Starting =====");
+ log.info("===== Streaming Starting =====");
stream(streamingInput,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL(), ckptPath);
- LOG.info("===== Streaming Ends =====");
+ log.info("===== Streaming Ends =====");
return null;
});
// thread for adding data to the streaming source and showing results
over time
Future<Integer> showFuture = executor.submit(() -> {
- LOG.info("===== Showing Starting =====");
+ log.info("===== Showing Starting =====");
int numCommits = addInputAndValidateIngestion(spark, fs, srcPath,0,
100, inputDF1, inputDF2, true);
- LOG.info("===== Showing Ends =====");
+ log.info("===== Showing Ends =====");
return numCommits;
});
@@ -229,18 +226,18 @@ public class HoodieJavaStreamingApp {
// thread for spark structured streaming
try {
Future<Void> streamFuture = executor.submit(() -> {
- LOG.info("===== Streaming Starting =====");
+ log.info("===== Streaming Starting =====");
stream(delStreamingInput,
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL(), ckptPath2);
- LOG.info("===== Streaming Ends =====");
+ log.info("===== Streaming Ends =====");
return null;
});
final int numCommits = numInitialCommits;
// thread for adding data to the streaming source and showing results
over time
Future<Void> showFuture = executor.submit(() -> {
- LOG.info("===== Showing Starting =====");
+ log.info("===== Showing Starting =====");
addInputAndValidateIngestion(newSpark, fs, srcPath2, numCommits, 80,
inputDF3, null, false);
- LOG.info("===== Showing Ends =====");
+ log.info("===== Showing Ends =====");
return null;
});
@@ -261,14 +258,14 @@ public class HoodieJavaStreamingApp {
while ((currTime - beginTime) < timeoutMsecs) {
try {
HoodieTimeline timeline =
HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, tablePath);
- LOG.info("Timeline :{}", timeline.getInstants());
+ log.info("Timeline :{}", timeline.getInstants());
if (timeline.countInstants() >= numCommits) {
return;
}
HoodieTableMetaClient metaClient = createMetaClient(new
HadoopStorageConfiguration(fs.getConf()), tablePath);
- LOG.info("Instants :{}", metaClient.getActiveTimeline().getInstants());
+ log.info("Instants :{}", metaClient.getActiveTimeline().getInstants());
} catch (TableNotFoundException te) {
- LOG.info("Got table not found exception. Retrying");
+ log.info("Got table not found exception. Retrying");
} finally {
Thread.sleep(sleepSecsAfterEachRun * 1000);
currTime = System.currentTimeMillis();
@@ -297,7 +294,7 @@ public class HoodieJavaStreamingApp {
// wait for spark streaming to process one microbatch
waitTillNCommits(fs, numExpCommits, 180, 3);
String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs,
tablePath);
- LOG.info("First commit at instant time :" + commitInstantTime1);
+ log.info("First commit at instant time: {}", commitInstantTime1);
String commitInstantTime2 = commitInstantTime1;
if (null != inputDF2) {
@@ -307,7 +304,7 @@ public class HoodieJavaStreamingApp {
Thread.sleep(3000);
waitTillNCommits(fs, numExpCommits, 180, 3);
commitInstantTime2 = HoodieDataSourceHelpers.listCommitsSince(fs,
tablePath, commitInstantTime1).stream().sorted().findFirst().get();
- LOG.info("Second commit at instant time :" + commitInstantTime2);
+ log.info("Second commit at instant time: {}", commitInstantTime2);
}
if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
@@ -316,7 +313,7 @@ public class HoodieJavaStreamingApp {
}
// Wait for compaction to also finish and track latest timestamp as
commit timestamp
waitTillNCommits(fs, numExpCommits, 180, 3);
- LOG.info("Compaction commit at instant time :" +
HoodieDataSourceHelpers.latestCommit(fs, tablePath));
+ log.info("Compaction commit at instant time: {}",
HoodieDataSourceHelpers.latestCommit(fs, tablePath));
}
/**
@@ -329,7 +326,7 @@ public class HoodieJavaStreamingApp {
spark.sql("select fare.amount, begin_lon, begin_lat, timestamp from
hoodie_ro where fare.amount > 2.0").show();
if (instantTimeValidation) {
- LOG.info("Showing all records. Latest Instant Time ={}",
commitInstantTime2);
+ log.info("Showing all records. Latest Instant Time ={}",
commitInstantTime2);
spark.sql("select * from hoodie_ro").show(200, false);
long numRecordsAtInstant2 =
spark.sql("select * from hoodie_ro where _hoodie_commit_time = " +
commitInstantTime2).count();
@@ -352,7 +349,7 @@ public class HoodieJavaStreamingApp {
// For incremental view, pass in the root/base path of dataset
.load(tablePath);
- LOG.info("You will only see records from : " + commitInstantTime2);
+ log.info("You will only see records from: {}", commitInstantTime2);
hoodieIncViewDF.groupBy(hoodieIncViewDF.col("_hoodie_commit_time")).count().show();
}
return numExpCommits;
@@ -394,7 +391,7 @@ public class HoodieJavaStreamingApp {
*/
private DataStreamWriter<Row> updateHiveSyncConfig(DataStreamWriter<Row>
writer) {
if (enableHiveSync) {
- LOG.info("Enabling Hive sync to " + hiveJdbcUrl);
+ log.info("Enabling Hive sync to {}", hiveJdbcUrl);
writer = writer.option(META_SYNC_TABLE_NAME.key(), hiveTable)
.option(META_SYNC_DATABASE_NAME.key(), hiveDB)
.option(HIVE_URL.key(), hiveJdbcUrl)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
index 081b962b44a8..d04df0e823e4 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
@@ -40,6 +40,8 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils;
+import lombok.AccessLevel;
+import lombok.Getter;
import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -66,6 +68,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestTableSchemaEvolution extends HoodieClientTestBase {
private final String initCommitTime = "000";
+ @Getter(AccessLevel.PROTECTED)
private HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
private HoodieTestDataGenerator dataGenEvolved = new
HoodieTestDataGenerator();
private HoodieTestDataGenerator dataGenDevolved = new
HoodieTestDataGenerator();
@@ -403,9 +406,4 @@ public class TestTableSchemaEvolution extends
HoodieClientTestBase {
private static boolean isSchemaCompatible(String oldSchema, String
newSchema, boolean shouldAllowDroppedColumns) {
return
HoodieSchemaCompatibility.isSchemaCompatible(HoodieSchema.parse(oldSchema),
HoodieSchema.parse(newSchema), shouldAllowDroppedColumns);
}
-
- @Override
- protected HoodieTableType getTableType() {
- return tableType;
- }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java
index c9037aa7b4fe..28ed9741d744 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java
@@ -46,14 +46,13 @@ import
org.apache.hudi.metadata.HoodieBackedTestDelayedTableMetadata;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.hudi.timeline.service.TimelineService;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
@@ -76,8 +75,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
* Tests the {@link RemoteHoodieTableFileSystemView} with metadata table
enabled, using
* {@link org.apache.hudi.common.table.view.HoodieTableFileSystemView} on the
timeline server.
*/
+@Slf4j
public class TestRemoteFileSystemViewWithMetadataTable extends
HoodieSparkClientTestHarness {
- private static final Logger LOG =
LoggerFactory.getLogger(TestRemoteFileSystemViewWithMetadataTable.class);
@BeforeEach
public void setUp() throws Exception {
@@ -119,7 +118,7 @@ public class TestRemoteFileSystemViewWithMetadataTable
extends HoodieSparkClient
context, metaClient.getStorage(),
config.getMetadataConfig(), metaClient.getBasePath().toString(), true)));
timelineService.startService();
timelineServicePort = timelineService.getServerPort();
- LOG.info("Started timeline server on port: " + timelineServicePort);
+ log.info("Started timeline server on port: {}", timelineServicePort);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
@@ -199,7 +198,7 @@ public class TestRemoteFileSystemViewWithMetadataTable
extends HoodieSparkClient
? timelineService.getServerPort()
:
writeClient.getTimelineServer().get().getRemoteFileSystemViewConfig(writeClient.getConfig()).getRemoteViewServerPort();
- LOG.info("Connecting to Timeline Server: " + timelineServerPort);
+ log.info("Connecting to Timeline Server: {}", timelineServerPort);
RemoteHoodieTableFileSystemView view =
new RemoteHoodieTableFileSystemView("localhost", timelineServerPort,
newMetaClient);
@@ -217,7 +216,7 @@ public class TestRemoteFileSystemViewWithMetadataTable
extends HoodieSparkClient
try {
return future.get();
} catch (Exception e) {
- LOG.error("Get result error", e);
+ log.error("Get result error", e);
return false;
}
}).reduce((a, b) -> a && b).get());
@@ -278,7 +277,7 @@ public class TestRemoteFileSystemViewWithMetadataTable
extends HoodieSparkClient
* Test callable to send lookup request to the timeline server for the
latest file slice
* based on the partition path and file ID.
*/
- class TestViewLookUpCallable implements Callable<Boolean> {
+ static class TestViewLookUpCallable implements Callable<Boolean> {
private final RemoteHoodieTableFileSystemView view;
private final Pair<String, String> partitionFileIdPair;
private final String expectedCommitTime;
@@ -303,8 +302,8 @@ public class TestRemoteFileSystemViewWithMetadataTable
extends HoodieSparkClient
boolean result = latestFileSlice.isPresent() &&
latestBaseFilePath.startsWith(expectedBasePath)
&& expectedCommitTime.equals(FSUtils.getCommitTime(new
Path(latestBaseFilePath).getName()));
if (!result) {
- LOG.error("The timeline server does not return the correct result:
latestFileSliceReturned="
- + latestFileSlice + " expectedCommitTime=" + expectedCommitTime);
+ log.error("The timeline server does not return the correct result:
latestFileSliceReturned={} expectedCommitTime={}",
+ latestFileSlice, expectedCommitTime);
}
return result;
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestColStatsRecordWithMetadataRecord.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestColStatsRecordWithMetadataRecord.java
index b4889ce87678..cc81d9a01513 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestColStatsRecordWithMetadataRecord.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestColStatsRecordWithMetadataRecord.java
@@ -54,14 +54,13 @@ import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
@@ -87,10 +86,9 @@ import static
org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+@Slf4j
public class TestColStatsRecordWithMetadataRecord extends
HoodieSparkClientTestHarness {
- private static final Logger LOG =
LoggerFactory.getLogger(TestColStatsRecordWithMetadataRecord.class);
-
@BeforeEach
public void setUp() throws Exception {
initSparkContexts("TestHoodieCreateHandle");
@@ -236,7 +234,7 @@ public class TestColStatsRecordWithMetadataRecord extends
HoodieSparkClientTestH
allRecords.forEach(record -> {
HoodieMetadataColumnStats actualColStatsMetadata =
record.getData().getColumnStatMetadata().get();
HoodieMetadataColumnStats expectedColStatsMetadata =
expectedColumnStatsRecords.get(finalCounter.getAndIncrement()).getData().getColumnStatMetadata().get();
- LOG.info("Validating {}, {}",expectedColStatsMetadata.getColumnName(),
expectedColStatsMetadata.getMinValue().getClass().getSimpleName());
+ log.info("Validating {}, {}",expectedColStatsMetadata.getColumnName(),
expectedColStatsMetadata.getMinValue().getClass().getSimpleName());
if
(expectedColStatsMetadata.getMinValue().getClass().getSimpleName().equals(DecimalWrapper.class.getSimpleName()))
{
// Big decimal gets wrapped w/ Decimal wrapper and converts to bytes.
assertEquals(expectedColStatsMetadata.getMinValue().toString(),
actualColStatsMetadata.getMinValue().toString());
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
index 517ac117e51b..a35348f6e3aa 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
@@ -122,6 +122,7 @@ import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
@@ -136,7 +137,6 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
-import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
@@ -211,11 +211,10 @@ import static
org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+@Slf4j
@Tag("functional")
public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
- private static final Logger LOG =
LoggerFactory.getLogger(TestHoodieBackedMetadata.class);
-
public static List<Arguments> tableTypeAndEnableOperationArgs() {
return asList(
Arguments.of(COPY_ON_WRITE, true),
@@ -2317,7 +2316,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
// Ensure all commits were synced to the Metadata Table
HoodieTableMetaClient metadataMetaClient =
createMetaClient(metadataTableBasePath);
- LOG.warn("total commits in metadata table " +
metadataMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
+ log.warn("total commits in metadata table {}",
metadataMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
// 8 commits (3 init + 5 deltacommits) and 2 cleaner commits.
assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(),
10);
@@ -3775,7 +3774,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
verifyMetadataColumnStatsRecords(storage, logFiles);
}
} catch (IOException e) {
- LOG.error("Metadata record validation failed", e);
+ log.error("Metadata record validation failed", e);
fail("Metadata record validation failed");
}
});
@@ -3844,17 +3843,17 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
if ((fsFileNames.size() != metadataFilenames.size())
|| (!fsFileNames.equals(metadataFilenames))) {
- LOG.info("*** File system listing = " +
Arrays.toString(fsFileNames.toArray()));
- LOG.info("*** Metadata listing = " +
Arrays.toString(metadataFilenames.toArray()));
+ log.info("*** File system listing = {}",
Arrays.toString(fsFileNames.toArray()));
+ log.info("*** Metadata listing = {}",
Arrays.toString(metadataFilenames.toArray()));
for (String fileName : fsFileNames) {
if (!metadataFilenames.contains(fileName)) {
- LOG.error(partition + "FsFilename " + fileName + " not found in
Meta data");
+ log.error("{} FsFilename {} not found in Meta data", partition,
fileName);
}
}
for (String fileName : metadataFilenames) {
if (!fsFileNames.contains(fileName)) {
- LOG.error(partition + "Metadata file " + fileName + " not found
in original FS");
+ log.error("{} Metadata file {} not found in original FS",
partition, fileName);
}
}
}
@@ -3941,13 +3940,13 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
verifyMetadataColumnStatsRecords(storage, logFiles);
}
} catch (Exception e) {
- LOG.error("Metadata record validation failed", e);
+ log.error("Metadata record validation failed", e);
fail("Metadata record validation failed");
}
});
// TODO: include validation for record_index partition here.
- LOG.info("Validation time=" + timer.endTimer());
+ log.info("Validation time={}", timer.endTimer());
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
index ec4a9c16d76f..946fb61c3b3a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
@@ -18,16 +18,16 @@
package org.apache.hudi.functional;
import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.HoodieDatasetBulkInsertHelper;
+import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.io.util.FileIOUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.bulkinsert.NonSortPartitionerWithRows;
+import org.apache.hudi.io.util.FileIOUtils;
import org.apache.hudi.keygen.ComplexKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
@@ -36,6 +36,7 @@ import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.testutils.DataSourceTestUtils;
import org.apache.hudi.testutils.HoodieSparkClientTestBase;
+import lombok.Getter;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.scheduler.SparkListener;
@@ -385,6 +386,7 @@ public class TestHoodieDatasetBulkInsertHelper extends
HoodieSparkClientTestBase
private boolean checkFlag = false;
private String checkMessage;
+ @Getter
private int parallelism;
StageCheckBulkParallelismListener(String checkMessage) {
@@ -402,9 +404,5 @@ public class TestHoodieDatasetBulkInsertHelper extends
HoodieSparkClientTestBase
checkFlag = true;
}
}
-
- public int getParallelism() {
- return parallelism;
- }
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieFileSystemViews.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieFileSystemViews.java
index 387a87b902fa..6b3173a76a4b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieFileSystemViews.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieFileSystemViews.java
@@ -51,6 +51,8 @@ import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.testutils.HoodieClientTestBase;
+import lombok.AccessLevel;
+import lombok.Getter;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -80,12 +82,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
*/
public class TestHoodieFileSystemViews extends HoodieClientTestBase {
+ @Getter(AccessLevel.PROTECTED)
private HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
- protected HoodieTableType getTableType() {
- return tableType;
- }
-
public static List<Arguments> tableTypeMetadataFSVTypeArgs() {
List<Arguments> testCases = new ArrayList<>();
for (HoodieTableType tableType : HoodieTableType.values()) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/CustomPayload.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/CustomPayload.java
index cfc111fe6a58..d9864d840925 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/CustomPayload.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/CustomPayload.java
@@ -22,21 +22,21 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import java.io.IOException;
+@AllArgsConstructor
public class CustomPayload implements HoodieRecordPayload<CustomPayload> {
+
private final GenericRecord record;
+ @Getter
private final Comparable orderingValue;
- public CustomPayload(GenericRecord record, Comparable orderingValue) {
- this.record = record;
- this.orderingValue = orderingValue;
- }
-
@Override
public CustomPayload preCombine(CustomPayload other) {
return this; // No-op for this test
@@ -71,9 +71,4 @@ public class CustomPayload implements
HoodieRecordPayload<CustomPayload> {
public Option<IndexedRecord> getInsertValue(Schema schema) throws
IOException {
return Option.ofNullable(record);
}
-
- @Override
- public Comparable getOrderingValue() {
- return orderingValue;
- }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMergeHandle.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMergeHandle.java
index 8f819cbea0a8..e8f3a4899c0e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMergeHandle.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMergeHandle.java
@@ -67,6 +67,8 @@ import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieSparkTable;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.spark.api.java.JavaRDD;
@@ -620,7 +622,10 @@ public class TestMergeHandle extends BaseTestHandle {
assertEquals(expectedTotalDeletedRecords, writeStat.getNumDeletes());
}
+ @AllArgsConstructor
+ @Getter
class InputAndExpectedDataSet {
+
private final Map<String, HoodieRecord> expectedRecordsMap;
private final int expectedUpdates;
private final int expectedDeletes;
@@ -628,45 +633,5 @@ public class TestMergeHandle extends BaseTestHandle {
private final List<HoodieRecord> newInserts;
private final List<HoodieRecord> validUpdates;
private final Map<String, HoodieRecord> validDeletes;
-
- public InputAndExpectedDataSet(Map<String, HoodieRecord>
expectedRecordsMap, int expectedUpdates, int expectedDeletes,
- List<HoodieRecord> recordsToMerge,
List<HoodieRecord> newInserts, List<HoodieRecord> validUpdates,
- Map<String, HoodieRecord> validDeletes) {
- this.expectedRecordsMap = expectedRecordsMap;
- this.expectedUpdates = expectedUpdates;
- this.expectedDeletes = expectedDeletes;
- this.recordsToMerge = recordsToMerge;
- this.validUpdates = validUpdates;
- this.newInserts = newInserts;
- this.validDeletes = validDeletes;
- }
-
- public Map<String, HoodieRecord> getExpectedRecordsMap() {
- return expectedRecordsMap;
- }
-
- public int getExpectedUpdates() {
- return expectedUpdates;
- }
-
- public int getExpectedDeletes() {
- return expectedDeletes;
- }
-
- public List<HoodieRecord> getRecordsToMerge() {
- return recordsToMerge;
- }
-
- public List<HoodieRecord> getNewInserts() {
- return newInserts;
- }
-
- public List<HoodieRecord> getValidUpdates() {
- return validUpdates;
- }
-
- public Map<String, HoodieRecord> getValidDeletes() {
- return validDeletes;
- }
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
index 0f2c007e45a0..4c66e451fbcf 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
@@ -45,6 +45,7 @@ import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -55,8 +56,6 @@ import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
@@ -80,9 +79,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
* Test class for upgrade/downgrade operations using pre-created fixture tables
* from different Hudi releases.
*/
+@Slf4j
public class TestUpgradeDowngrade extends SparkClientFunctionalTestHarness {
-
- private static final Logger LOG =
LoggerFactory.getLogger(TestUpgradeDowngrade.class);
@TempDir
java.nio.file.Path tempDir;
@@ -123,7 +121,7 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
public void testUpgradeOrDowngrade(HoodieTableVersion fromVersion,
HoodieTableVersion toVersion, String suffix) throws Exception {
boolean isUpgrade = fromVersion.lesserThan(toVersion);
String operation = isUpgrade ? "upgrade" : "downgrade";
- LOG.info("Testing {} from version {} to {}", operation, fromVersion,
toVersion);
+ log.info("Testing {} from version {} to {}", operation, fromVersion,
toVersion);
HoodieTableMetaClient originalMetaClient = loadFixtureTable(fromVersion,
suffix);
assertEquals(fromVersion,
originalMetaClient.getTableConfig().getTableVersion(),
@@ -171,13 +169,13 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
assertTrue(finalCompletedCommits >= initialCompletedCommits,
"Completed commits should be preserved or increased after " +
operation);
- LOG.info("Successfully completed {} test for version {} -> {}", operation,
fromVersion, toVersion);
+ log.info("Successfully completed {} test for version {} -> {}", operation,
fromVersion, toVersion);
}
@ParameterizedTest
@MethodSource("versionsBelowSix")
public void testUpgradeForVersionsStartingBelowSixBlocked(HoodieTableVersion
originalVersion) throws Exception {
- LOG.info("Testing auto-upgrade disabled for version {} (below SIX)",
originalVersion);
+ log.info("Testing auto-upgrade disabled for version {} (below SIX)",
originalVersion);
HoodieTableMetaClient originalMetaClient =
loadFixtureTable(originalVersion);
HoodieTableVersion targetVersion = getNextVersion(originalVersion).get();
@@ -200,13 +198,13 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
@ParameterizedTest
@MethodSource("versionsSixAndAbove")
public void testAutoUpgradeDisabledForVersionsSixAndAbove(HoodieTableVersion
originalVersion) throws Exception {
- LOG.info("Testing auto-upgrade disabled for version {} (SIX and above)",
originalVersion);
+ log.info("Testing auto-upgrade disabled for version {} (SIX and above)",
originalVersion);
HoodieTableMetaClient originalMetaClient =
loadFixtureTable(originalVersion, "-mor");
Option<HoodieTableVersion> targetVersionOpt =
getNextVersion(originalVersion);
if (!targetVersionOpt.isPresent()) {
- LOG.info("Skipping auto-upgrade test for version {} (no higher version
available)", originalVersion);
+ log.info("Skipping auto-upgrade test for version {} (no higher version
available)", originalVersion);
return;
}
HoodieTableVersion targetVersion = targetVersionOpt.get();
@@ -227,7 +225,7 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
validateVersionSpecificProperties(unchangedMetaClient, originalVersion);
performDataValidationOnTable(unchangedMetaClient, "after auto-upgrade
disabled test");
- LOG.info("Auto-upgrade disabled test passed for version {}",
originalVersion);
+ log.info("Auto-upgrade disabled test passed for version {}",
originalVersion);
}
/**
@@ -252,7 +250,7 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
@MethodSource("writeTableVersionTestCases")
public void testAutoUpgradeWithWriteTableVersionConfiguration(
Option<HoodieTableVersion> writeTableVersion, HoodieTableVersion
expectedVersion, String description) throws Exception {
- LOG.info("Testing auto-upgrade configuration: {}", description);
+ log.info("Testing auto-upgrade configuration: {}", description);
HoodieTableMetaClient originalMetaClient =
loadFixtureTable(HoodieTableVersion.SIX, "-mor");
assertEquals(HoodieTableVersion.SIX,
originalMetaClient.getTableConfig().getTableVersion(),
"Fixture table should start at version SIX");
@@ -289,7 +287,7 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
@Test
public void testNeedsUpgradeWithAutoUpgradeDisabledAndWriteVersionOverride()
throws Exception {
- LOG.info("Testing needsUpgrade with auto-upgrade disabled and write
version override");
+ log.info("Testing needsUpgrade with auto-upgrade disabled and write
version override");
// Test case: Table at version 6, write version set to 8, auto-upgrade
disabled
// Expected: needsUpgrade should return false and set write version to
match table version
@@ -328,7 +326,7 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
@ParameterizedTest
@MethodSource("blockedDowngradeVersionPairs")
public void testDowngradeToVersionsBelowSixBlocked(HoodieTableVersion
fromVersion, HoodieTableVersion toVersion) throws Exception {
- LOG.info("Testing blocked downgrade from version {} to {} (below SIX)",
fromVersion, toVersion);
+ log.info("Testing blocked downgrade from version {} to {} (below SIX)",
fromVersion, toVersion);
HoodieTableMetaClient originalMetaClient = loadFixtureTable(fromVersion);
assertEquals(fromVersion,
originalMetaClient.getTableConfig().getTableVersion(),
@@ -486,7 +484,7 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
String fixtureName = getFixtureName(version, suffix);
String resourcePath = getFixturesBasePath(suffix) + fixtureName;
- LOG.info("Loading fixture from resource path: {}", resourcePath);
+ log.info("Loading fixture from resource path: {}", resourcePath);
HoodieTestUtils.extractZipToDirectory(resourcePath, tempDir, getClass());
String tableName = fixtureName.replace(".zip", "");
@@ -497,7 +495,7 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
.setBasePath(tablePath)
.build();
- LOG.info("Loaded fixture table {} at version {}", fixtureName,
metaClient.getTableConfig().getTableVersion());
+ log.info("Loaded fixture table {} at version {}", fixtureName,
metaClient.getTableConfig().getTableVersion());
return metaClient;
}
@@ -508,7 +506,7 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
String fixtureName = "hudi-v" + version.versionCode() + "-table-payload-"
+ payloadType + ".zip";
String resourcePath = "/upgrade-downgrade-fixtures/payload-tables/" +
fixtureName;
- LOG.info("Loading payload fixture from resource path: {}", resourcePath);
+ log.info("Loading payload fixture from resource path: {}", resourcePath);
HoodieTestUtils.extractZipToDirectory(resourcePath, tempDir, getClass());
String tableName = fixtureName.replace(".zip", "");
@@ -519,7 +517,7 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
.setBasePath(tablePath)
.build();
- LOG.info("Loaded payload fixture table {} at version {}", fixtureName,
metaClient.getTableConfig().getTableVersion());
+ log.info("Loaded payload fixture table {} at version {}", fixtureName,
metaClient.getTableConfig().getTableVersion());
return metaClient;
}
@@ -662,12 +660,12 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
if (expectedVersion.greaterThanOrEquals(HoodieTableVersion.FOUR)) {
StoragePath metadataTablePath =
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath());
if (metaClient.getStorage().exists(metadataTablePath)) {
- LOG.info("Verifying metadata table version at: {}", metadataTablePath);
+ log.info("Verifying metadata table version at: {}", metadataTablePath);
HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder()
.setConf(metaClient.getStorageConf().newInstance()).setBasePath(metadataTablePath).build();
assertTableVersion(mdtMetaClient, expectedVersion);
} else {
- LOG.info("Metadata table does not exist at: {}", metadataTablePath);
+ log.info("Metadata table does not exist at: {}", metadataTablePath);
}
}
}
@@ -683,7 +681,7 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
*/
private void validateVersionSpecificProperties(
HoodieTableMetaClient metaClient, HoodieTableVersion version) throws
IOException {
- LOG.info("Validating version-specific properties for version {}", version);
+ log.info("Validating version-specific properties for version {}", version);
HoodieTableConfig tableConfig = metaClient.getTableConfig();
@@ -699,7 +697,7 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
validateVersion9Properties(metaClient, tableConfig);
break;
default:
- LOG.warn("No specific property validation for version {}", version);
+ log.warn("No specific property validation for version {}", version);
}
}
@@ -727,7 +725,7 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
"TABLE_METADATA_PARTITIONS should contain 'files' partition when
metadata table exists");
} else {
// Metadata table doesn't exist (likely after downgrade) - validation
not applicable
- LOG.info("Skipping TABLE_METADATA_PARTITIONS 'files' validation -
metadata table does not exist (likely after downgrade operation)");
+ log.info("Skipping TABLE_METADATA_PARTITIONS 'files' validation -
metadata table does not exist (likely after downgrade operation)");
}
}
}
@@ -743,7 +741,7 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
*/
private void validateLogFilesCount(HoodieTableMetaClient metaClient, String
operation, boolean expectLogFiles) {
String validationPhase = expectLogFiles ? "before" : "after";
- LOG.info("Validating log files {} rollback and compaction during {}",
validationPhase, operation);
+ log.info("Validating log files {} rollback and compaction during {}",
validationPhase, operation);
// Get the latest completed commit to ensure we're looking at a consistent
state
org.apache.hudi.common.table.timeline.HoodieTimeline completedTimeline =
@@ -782,7 +780,7 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
assertEquals(0, totalLogFiles,
"No log files should remain after rollback and compaction during "
+ operation);
}
- LOG.info("Log file validation passed: {} log files found (expected:
{})",
+ log.info("Log file validation passed: {} log files found (expected:
{})",
totalLogFiles, expectLogFiles ? ">0" : "0");
} catch (Exception e) {
throw new RuntimeException("Failed to validate log files during " +
operation, e);
@@ -825,17 +823,17 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
// If table is partitioned, validate partition path migration
if (tableConfig.isTablePartitioned()) {
- LOG.info("Validating V5 partition path migration for partitioned table");
+ log.info("Validating V5 partition path migration for partitioned table");
// Check hive-style partitioning configuration
boolean hiveStylePartitioningEnable =
Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable());
- LOG.info("Hive-style partitioning enabled: {}",
hiveStylePartitioningEnable);
+ log.info("Hive-style partitioning enabled: {}",
hiveStylePartitioningEnable);
// Validate partition field configuration exists
assertTrue(tableConfig.getPartitionFields().isPresent(),
"Partition fields should be present for partitioned table in V5");
} else {
- LOG.info("Non-partitioned table - skipping partition path validation for
V5");
+ log.info("Non-partitioned table - skipping partition path validation for
V5");
}
}
@@ -848,12 +846,12 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
if (!metaClient.getStorage().exists(auxPath)) {
// Auxiliary folder doesn't exist - this is valid, nothing to clean up
- LOG.info("V6 validation passed: Auxiliary folder does not exist");
+ log.info("V6 validation passed: Auxiliary folder does not exist");
return;
}
// Auxiliary folder exists - validate that REQUESTED compaction files were
cleaned up
- LOG.info("V6 validation: Checking auxiliary folder cleanup at: {}",
auxPath);
+ log.info("V6 validation: Checking auxiliary folder cleanup at: {}",
auxPath);
// Get pending compaction timeline with REQUESTED state (same as upgrade
handler)
HoodieTimeline compactionTimeline =
metaClient.getActiveTimeline().filterPendingCompactionTimeline()
@@ -873,7 +871,7 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
}
});
- LOG.info("V6 validation passed: {} REQUESTED compaction instants verified
to be cleaned up from auxiliary folder",
+ log.info("V6 validation passed: {} REQUESTED compaction instants verified
to be cleaned up from auxiliary folder",
compactionTimeline.countInstants());
}
@@ -922,7 +920,7 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
* Read table data for validation purposes.
*/
private Dataset<Row> readTableData(HoodieTableMetaClient metaClient, String
stage) {
- LOG.info("Reading table data {}", stage);
+ log.info("Reading table data {}", stage);
try {
String basePath = metaClient.getBasePath().toString();
@@ -940,10 +938,10 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
// Convert collected rows back to Dataset for use in validation
Dataset<Row> materializedData = sqlContext().createDataFrame(rows,
tableData.schema());
- LOG.info("Successfully read and materialized table data {} ({} rows)",
stage, rowCount);
+ log.info("Successfully read and materialized table data {} ({} rows)",
stage, rowCount);
return materializedData;
} catch (Exception e) {
- LOG.error("Failed to read table data {} from: {} (version: {})",
+ log.error("Failed to read table data {} from: {} (version: {})",
stage, metaClient.getBasePath(),
metaClient.getTableConfig().getTableVersion(), e);
throw new RuntimeException("Failed to read table data " + stage, e);
}
@@ -954,7 +952,7 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
* This ensures that upgrade/downgrade operations preserve data integrity.
*/
private void validateDataConsistency(Dataset<Row> originalData,
HoodieTableMetaClient metaClient, String stage) {
- LOG.info("Validating data consistency {}", stage);
+ log.info("Validating data consistency {}", stage);
try {
Dataset<Row> currentData = readTableData(metaClient, stage);
@@ -969,26 +967,26 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
.collect(Collectors.toSet());
if (columnsToValidate.isEmpty()) {
- LOG.info("Skipping data consistency validation {} (no business columns
to validate)", stage);
+ log.info("Skipping data consistency validation {} (no business columns
to validate)", stage);
return;
}
- LOG.info("Validating data columns: {}", columnsToValidate);
+ log.info("Validating data columns: {}", columnsToValidate);
boolean dataConsistent = areDataframesEqual(originalData, currentData,
columnsToValidate);
assertTrue(dataConsistent, " data should be consistent between original
and " + stage + " states");
- LOG.info("Data consistency validation passed {}", stage);
+ log.info("Data consistency validation passed {}", stage);
} catch (Exception e) {
throw new RuntimeException("Data consistency validation failed " +
stage, e);
}
}
private void performDataValidationOnTable(HoodieTableMetaClient metaClient,
String stage) {
- LOG.info("Performing data validation on table {}", stage);
+ log.info("Performing data validation on table {}", stage);
try {
Dataset<Row> tableData = readTableData(metaClient, stage);
- LOG.info("Data validation passed {} (table accessible, {} rows)", stage,
tableData.count());
+ log.info("Data validation passed {} (table accessible, {} rows)", stage,
tableData.count());
} catch (Exception e) {
throw new RuntimeException("Data validation failed " + stage, e);
}
@@ -997,7 +995,7 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
@ParameterizedTest
@MethodSource("testArgsPayloadUpgradeDowngrade")
public void testPayloadUpgradeDowngrade(String tableType, RecordMergeMode
recordMergeMode, String payloadType) throws Exception {
- LOG.info("Testing payload upgrade/downgrade for: {} (tableType: {},
recordMergeMode: {})",
+ log.info("Testing payload upgrade/downgrade for: {} (tableType: {},
recordMergeMode: {})",
payloadType, tableType, recordMergeMode);
// Load v6 fixture for this payload type
@@ -1092,6 +1090,6 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
assertEquals(6, readOptimizedDataAfterDowngrade.count(), "Read-optimized
query should return 6 records after downgrade: " + payloadType);
// will perform real time query and do dataframe validation
validateDataConsistency(expectedDataWithNewRecord, metaClientV6,
"dataframe validation after v9->v6 downgrade for " + payloadType);
- LOG.info("Completed payload upgrade/downgrade test for: {}", payloadType);
+ log.info("Completed payload upgrade/downgrade test for: {}", payloadType);
}
}