This is an automated email from the ASF dual-hosted git repository.
voonhous pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 095e80a9c256 refactor: Add Lombok annotations to hudi-utilities (Part
1) (#17823)
095e80a9c256 is described below
commit 095e80a9c256959d3802b0a138e48e72747940a9
Author: voonhous <[email protected]>
AuthorDate: Tue Jun 2 15:46:24 2026 +0800
refactor: Add Lombok annotations to hudi-utilities (Part 1) (#17823)
* refactor: Add Lombok annotations to hudi-utilities (Part 1)
---
hudi-utilities/pom.xml | 1 +
.../hudi/utilities/HiveIncrementalPuller.java | 53 ++++-----
.../org/apache/hudi/utilities/HoodieCleaner.java | 10 +-
.../apache/hudi/utilities/HoodieClusteringJob.java | 36 +++---
.../org/apache/hudi/utilities/HoodieCompactor.java | 43 ++++---
.../hudi/utilities/HoodieDataTableValidator.java | 36 +++---
.../hudi/utilities/HoodieDropPartitionsTool.java | 31 +++--
.../org/apache/hudi/utilities/HoodieIndexer.java | 36 +++---
.../utilities/HoodieMetadataTableValidator.java | 132 +++++++++------------
.../apache/hudi/utilities/HoodieRepairTool.java | 57 ++++-----
.../hudi/utilities/HoodieSnapshotExporter.java | 21 ++--
.../org/apache/hudi/utilities/HoodieTTLJob.java | 9 +-
.../org/apache/hudi/utilities/TableSizeStats.java | 56 ++++-----
.../org/apache/hudi/utilities/UtilHelpers.java | 59 +++++----
.../kafka/HoodieWriteCommitKafkaCallback.java | 18 ++-
.../pulsar/HoodieWriteCommitPulsarCallback.java | 14 +--
.../deser/KafkaAvroSchemaDeserializer.java | 5 +-
.../ingestion/HoodieIngestionException.java | 6 +-
.../ingestion/HoodieIngestionService.java | 18 ++-
.../hudi/utilities/multitable/ArchiveTask.java | 7 +-
.../hudi/utilities/multitable/ClusteringTask.java | 6 +-
.../multitable/HoodieMultiTableServicesMain.java | 12 +-
.../multitable/MultiTableServiceUtils.java | 7 +-
.../hudi/utilities/perf/TimelineServerPerf.java | 9 +-
.../utilities/schema/DelegatingSchemaProvider.java | 10 +-
.../utilities/schema/FilebasedSchemaProvider.java | 7 +-
.../hudi/utilities/schema/HiveSchemaProvider.java | 18 +--
.../utilities/schema/SchemaRegistryProvider.java | 8 +-
.../utilities/schema/SimpleSchemaProvider.java | 7 +-
.../DeleteSupportSchemaPostProcessor.java | 8 +-
.../DropColumnSchemaPostProcessor.java | 8 +-
.../utilities/transform/ChainedTransformer.java | 8 +-
.../utilities/transform/FlatteningTransformer.java | 7 +-
.../transform/SqlFileBasedTransformer.java | 12 +-
.../transform/SqlQueryBasedTransformer.java | 10 +-
35 files changed, 357 insertions(+), 428 deletions(-)
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index 221eb7c7793b..5f8c620b5291 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -234,6 +234,7 @@
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
+ <scope>provided</scope>
</dependency>
<!-- Fasterxml -->
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
index 2510edce72a8..538253c9f60c 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
@@ -28,13 +28,12 @@ import
org.apache.hudi.utilities.exception.HoodieIncrementalPullSQLException;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.stringtemplate.v4.ST;
import java.io.File;
@@ -62,10 +61,9 @@ import static
org.apache.hudi.io.util.FileIOUtils.readAsUTFString;
* - Only the source table can be incrementally pulled (usually the largest
table) - The incrementally pulled table
* can't be referenced more than once.
*/
+@Slf4j
public class HiveIncrementalPuller {
- private static final Logger LOG =
LoggerFactory.getLogger(HiveIncrementalPuller.class);
-
public static class Config implements Serializable {
@Parameter(names = {"--hiveUrl"})
@@ -133,15 +131,14 @@ public class HiveIncrementalPuller {
incrementalSQL = scanner.useDelimiter("\\Z").next();
}
if (!incrementalSQL.contains(config.sourceDb + "." + config.sourceTable)) {
- LOG.error("Incremental SQL does not have " + config.sourceDb + "." +
config.sourceTable
- + ", which means its pulling from a different table. Fencing this
from happening.");
+ log.error("Incremental SQL does not have {}.{}, which means its pulling
from a different table. Fencing this from happening.",
+ config.sourceDb, config.sourceTable);
throw new HoodieIncrementalPullSQLException(
"Incremental SQL does not have " + config.sourceDb + "." +
config.sourceTable);
}
if (!incrementalSQL.contains("`_hoodie_commit_time` > '%s'")) {
- LOG.error("Incremental SQL : " + incrementalSQL
- + " does not contain `_hoodie_commit_time` > '%s'. Please add "
- + "this clause for incremental to work properly.");
+ log.error("Incremental SQL : {} does not contain `_hoodie_commit_time` >
'%s'. Please add this clause for incremental to work properly.",
+ incrementalSQL);
throw new HoodieIncrementalPullSQLException(
"Incremental SQL does not have clause `_hoodie_commit_time` > '%s',
which "
+ "means its not pulling incrementally");
@@ -157,14 +154,14 @@ public class HiveIncrementalPuller {
try {
if (config.fromCommitTime == null) {
config.fromCommitTime = inferCommitTime(fs);
- LOG.info("FromCommitTime inferred as " + config.fromCommitTime);
+ log.info("FromCommitTime inferred as {}", config.fromCommitTime);
}
- LOG.info("FromCommitTime - " + config.fromCommitTime);
+ log.info("FromCommitTime - {}", config.fromCommitTime);
String sourceTableLocation = getTableLocation(config.sourceDb,
config.sourceTable);
String lastCommitTime = getLastCommitTimePulled(fs, sourceTableLocation);
if (lastCommitTime == null) {
- LOG.info("Nothing to pull. However we will continue to create a empty
table");
+ log.info("Nothing to pull. However we will continue to create a empty
table");
lastCommitTime = config.fromCommitTime;
}
@@ -182,9 +179,9 @@ public class HiveIncrementalPuller {
initHiveBeelineProperties(stmt);
executeIncrementalSQL(tempDbTable, tempDbTablePath, stmt);
- LOG.info("Finished HoodieReader execution");
+ log.info("Finished HoodieReader execution");
} catch (SQLException e) {
- LOG.error("Exception when executing SQL", e);
+ log.error("Exception when executing SQL", e);
throw new IOException("Could not scan " + config.sourceTable + "
incrementally", e);
} finally {
try {
@@ -192,14 +189,14 @@ public class HiveIncrementalPuller {
stmt.close();
}
} catch (SQLException e) {
- LOG.error("Could not close the resultSet opened ", e);
+ log.error("Could not close the resultSet opened ", e);
}
try {
if (this.connection != null) {
this.connection.close();
}
} catch (SQLException e) {
- LOG.error("Could not close the JDBC connection", e);
+ log.error("Could not close the JDBC connection", e);
} finally {
this.connection = null;
}
@@ -229,7 +226,7 @@ public class HiveIncrementalPuller {
}
private void initHiveBeelineProperties(Statement stmt) throws SQLException {
- LOG.info("Setting up Hive JDBC Session with properties");
+ log.info("Setting up Hive JDBC Session with properties");
// set the queue
executeStatement("set mapred.job.queue.name=" + config.yarnQueueName,
stmt);
// Set the inputFormat to HoodieCombineHiveInputFormat
@@ -247,18 +244,18 @@ public class HiveIncrementalPuller {
}
private boolean deleteHDFSPath(FileSystem fs, String path) throws
IOException {
- LOG.info("Deleting path " + path);
+ log.info("Deleting path {}", path);
return fs.delete(new Path(path), true);
}
private void executeStatement(String sql, Statement stmt) throws
SQLException {
- LOG.info("Executing: " + sql);
+ log.info("Executing: {}", sql);
stmt.execute(sql);
}
private String inferCommitTime(FileSystem fs) throws IOException {
- LOG.info("FromCommitTime not specified. Trying to infer it from Hoodie
table " + config.targetDb + "."
- + config.targetTable);
+ log.info("FromCommitTime not specified. Trying to infer it from Hoodie
table {}.{}",
+ config.targetDb, config.targetTable);
String targetDataLocation = getTableLocation(config.targetDb,
config.targetTable);
return scanForCommitTime(fs, targetDataLocation);
}
@@ -272,7 +269,7 @@ public class HiveIncrementalPuller {
resultSet = stmt.executeQuery("describe formatted `" + db + "." + table
+ "`");
while (resultSet.next()) {
if (resultSet.getString(1).trim().equals("Location:")) {
- LOG.info("Inferred table location for " + db + "." + table + " as "
+ resultSet.getString(2));
+ log.info("Inferred table location for {}.{} as {}", db, table,
resultSet.getString(2));
return resultSet.getString(2);
}
}
@@ -287,7 +284,7 @@ public class HiveIncrementalPuller {
resultSet.close();
}
} catch (SQLException e) {
- LOG.error("Could not close the resultSet opened ", e);
+ log.error("Could not close the resultSet opened ", e);
}
}
return null;
@@ -314,7 +311,7 @@ public class HiveIncrementalPuller {
private boolean ensureTempPathExists(FileSystem fs, String lastCommitTime)
throws IOException {
Path targetBaseDirPath = new Path(config.hoodieTmpDir, config.targetTable
+ "__" + config.sourceTable);
if (!fs.exists(targetBaseDirPath)) {
- LOG.info("Creating " + targetBaseDirPath + " with permission
drwxrwxrwx");
+ log.info("Creating {} with permission drwxrwxrwx", targetBaseDirPath);
boolean result =
FileSystem.mkdirs(fs, targetBaseDirPath, new
FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
if (!result) {
@@ -329,7 +326,7 @@ public class HiveIncrementalPuller {
throw new HoodieException("Could not delete existing " + targetPath);
}
}
- LOG.info("Creating " + targetPath + " with permission drwxrwxrwx");
+ log.info("Creating {} with permission drwxrwxrwx", targetPath);
return FileSystem.mkdirs(fs, targetBaseDirPath, new
FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
}
@@ -341,17 +338,17 @@ public class HiveIncrementalPuller {
.findInstantsAfter(config.fromCommitTime,
config.maxCommits).getInstantsAsStream().map(HoodieInstant::requestedTime)
.collect(Collectors.toList());
if (commitsToSync.isEmpty()) {
- LOG.info("Nothing to sync. All commits in {} are {} and from commit time
is {}", config.sourceTable, metadata.getActiveTimeline().getCommitsTimeline()
+ log.info("Nothing to sync. All commits in {} are {} and from commit time
is {}", config.sourceTable, metadata.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants().getInstants(), config.fromCommitTime);
return null;
}
- LOG.info("Syncing commits {}", commitsToSync);
+ log.info("Syncing commits {}", commitsToSync);
return commitsToSync.get(commitsToSync.size() - 1);
}
private Connection getConnection() throws SQLException {
if (connection == null) {
- LOG.info("Getting Hive Connection to {}", config.hiveJDBCUrl);
+ log.info("Getting Hive Connection to {}", config.hiveJDBCUrl);
this.connection = DriverManager.getConnection(config.hiveJDBCUrl,
config.hiveUsername, config.hivePassword);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java
index 63ec3354410d..90b0c7dc1515 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java
@@ -27,19 +27,17 @@ import org.apache.hudi.exception.HoodieException;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+@Slf4j
public class HoodieCleaner {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieCleaner.class);
-
/**
* Config for Cleaner.
*/
@@ -63,7 +61,7 @@ public class HoodieCleaner {
this.cfg = cfg;
this.jssc = jssc;
this.props = props;
- LOG.info("Creating Cleaner with configs : " + props.toString());
+ log.info("Creating Cleaner with configs : {}", props.toString());
}
public void run() {
@@ -124,6 +122,6 @@ public class HoodieCleaner {
SparkAdapterSupport$.MODULE$.sparkAdapter().stopSparkContext(jssc,
exitCode);
}
- LOG.info("Cleaner ran successfully");
+ log.info("Cleaner ran successfully");
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
index c378bce9026c..67e35aea7831 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
@@ -37,9 +37,8 @@ import org.apache.hudi.exception.HoodieException;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList;
@@ -53,9 +52,9 @@ import static
org.apache.hudi.utilities.UtilHelpers.PURGE_PENDING_INSTANT;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
+@Slf4j
public class HoodieClusteringJob {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieClusteringJob.class);
private final Config cfg;
private final TypedProperties props;
private final JavaSparkContext jsc;
@@ -168,7 +167,7 @@ public class HoodieClusteringJob {
if (result != 0) {
throw new HoodieException(resultMsg + " failed");
}
- LOG.info(resultMsg + " success");
+ log.info("{} success", resultMsg);
jsc.stop();
}
@@ -187,28 +186,28 @@ public class HoodieClusteringJob {
return UtilHelpers.retry(retry, () -> {
switch (cfg.runningMode.toLowerCase()) {
case SCHEDULE: {
- LOG.info("Running Mode: [" + SCHEDULE + "]; Do schedule");
+ log.info("Running Mode: [{}]; Do schedule", SCHEDULE);
Option<String> instantTime = doSchedule(jsc);
int result = instantTime.isPresent() ? 0 : -1;
if (result == 0) {
- LOG.info("The schedule instant time is " + instantTime.get());
+ log.info("The schedule instant time is {}", instantTime.get());
}
return result;
}
case SCHEDULE_AND_EXECUTE: {
- LOG.info("Running Mode: [" + SCHEDULE_AND_EXECUTE + "]");
+ log.info("Running Mode: [{}]", SCHEDULE_AND_EXECUTE);
return doScheduleAndCluster(jsc);
}
case EXECUTE: {
- LOG.info("Running Mode: [" + EXECUTE + "]; Do cluster");
+ log.info("Running Mode: [{}]; Do cluster", EXECUTE);
return doCluster(jsc);
}
case PURGE_PENDING_INSTANT: {
- LOG.info("Running Mode: [" + PURGE_PENDING_INSTANT + "];");
+ log.info("Running Mode: [{}];", PURGE_PENDING_INSTANT);
return doPurgePendingInstant(jsc);
}
default: {
- LOG.error("Unsupported running mode [" + cfg.runningMode + "], quit
the job directly");
+ log.error("Unsupported running mode [{}], quit the job directly",
cfg.runningMode);
return -1;
}
}
@@ -224,10 +223,9 @@ public class HoodieClusteringJob {
metaClient.getActiveTimeline().getFirstPendingClusterInstant();
if (firstClusteringInstant.isPresent()) {
cfg.clusteringInstantTime =
firstClusteringInstant.get().requestedTime();
- LOG.info("Found the earliest scheduled clustering instant which will
be executed: "
- + cfg.clusteringInstantTime);
+ log.info("Found the earliest scheduled clustering instant which will
be executed: {}", cfg.clusteringInstantTime);
} else {
- LOG.info("There is no scheduled clustering in the table.");
+ log.info("There is no scheduled clustering in the table.");
return 0;
}
}
@@ -255,7 +253,7 @@ public class HoodieClusteringJob {
}
private int doScheduleAndCluster(JavaSparkContext jsc) throws Exception {
- LOG.info("Step 1: Do schedule");
+ log.info("Step 1: Do schedule");
metaClient = HoodieTableMetaClient.reload(metaClient);
String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
try (SparkRDDWriteClient<HoodieRecordPayload> client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism,
Option.empty(), props)) {
@@ -265,19 +263,19 @@ public class HoodieClusteringJob {
Option<HoodieInstant> staleInstant =
TableServiceUtils.findStaleInflightInstant(
metaClient, HoodieTimeline.CLUSTERING_ACTION,
cfg.maxProcessingTimeMs);
if (staleInstant.isPresent()) {
- LOG.info("Found failed clustering instant at : " +
staleInstant.get() + "; Will rollback the failed clustering and re-trigger
again.");
+ log.info("Found failed clustering instant at : {}; Will rollback the
failed clustering and re-trigger again.", staleInstant.get());
instantTime = Option.of(staleInstant.get().requestedTime());
}
}
instantTime = instantTime.isPresent() ? instantTime : doSchedule(client);
if (!instantTime.isPresent()) {
- LOG.info("Couldn't generate cluster plan");
+ log.info("Couldn't generate cluster plan");
return -1;
}
- LOG.info("The schedule instant time is " + instantTime.get());
- LOG.info("Step 2: Do cluster");
+ log.info("The schedule instant time is {}", instantTime.get());
+ log.info("Step 2: Do cluster");
Option<HoodieCommitMetadata> metadata =
client.cluster(instantTime.get()).getCommitMetadata();
clean(client);
return UtilHelpers.handleErrors(metadata.get(), instantTime.get());
@@ -345,7 +343,7 @@ public class HoodieClusteringJob {
metaClient.reloadActiveTimeline();
if (metaClient.getActiveTimeline().filterInflightsAndRequested()
.containsInstant(instant.requestedTime())) {
- LOG.info("Rolling back expired clustering instant {}",
instant.requestedTime());
+ log.info("Rolling back expired clustering instant {}",
instant.requestedTime());
client.rollback(instant.requestedTime());
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
index 2701ce7c9a94..fe64c95a364f 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
@@ -38,20 +38,19 @@ import
org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionS
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+@Slf4j
public class HoodieCompactor {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieCompactor.class);
public static final String EXECUTE = "execute";
public static final String SCHEDULE = "schedule";
public static final String SCHEDULE_AND_EXECUTE = "scheduleandexecute";
@@ -75,11 +74,12 @@ public class HoodieCompactor {
this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), false);
if (this.metaClient.getTableConfig().isMetadataTableAvailable()) {
// add default lock config options if MDT is enabled.
- UtilHelpers.addLockOptions(cfg.basePath,
this.metaClient.getBasePath().toUri().getScheme(), this.props);
+ UtilHelpers.addLockOptions(cfg.basePath,
this.metaClient.getBasePath().toUri().getScheme(), this.props);
}
}
public static class Config implements Serializable {
+
@Parameter(names = {"--base-path", "-sp"}, description = "Base path for
the table", required = true)
public String basePath = null;
@Parameter(names = {"--table-name", "-tn"}, description = "Table name",
required = true)
@@ -196,7 +196,7 @@ public class HoodieCompactor {
if (ret != 0) {
throw new HoodieException("Fail to run compaction for " + cfg.tableName
+ ", return code: " + ret);
}
- LOG.info("Success to run compaction for " + cfg.tableName);
+ log.info("Success to run compaction for {}", cfg.tableName);
jsc.stop();
}
@@ -204,29 +204,29 @@ public class HoodieCompactor {
this.fs = HadoopFSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration());
// need to do validate in case that users call compact() directly without
setting cfg.runningMode
validateRunningMode(cfg);
- LOG.info(cfg.toString());
+ log.info(cfg.toString());
int ret = UtilHelpers.retry(retry, () -> {
switch (cfg.runningMode.toLowerCase()) {
case SCHEDULE: {
- LOG.info("Running Mode: [" + SCHEDULE + "]; Do schedule");
+ log.info("Running Mode: [{}] Do schedule", SCHEDULE);
Option<String> instantTime = doSchedule(jsc);
int result = instantTime.isPresent() ? 0 : -1;
if (result == 0) {
- LOG.info("The schedule instant time is " + instantTime.get());
+ log.info("The schedule instant time is {}", instantTime.get());
}
return result;
}
case SCHEDULE_AND_EXECUTE: {
- LOG.info("Running Mode: [" + SCHEDULE_AND_EXECUTE + "]");
+ log.info("Running Mode: [{}]", SCHEDULE_AND_EXECUTE);
return doScheduleAndCompact(jsc);
}
case EXECUTE: {
- LOG.info("Running Mode: [" + EXECUTE + "]; Do compaction");
+ log.info("Running Mode: [{}]; Do compaction", EXECUTE);
return doCompact(jsc);
}
default: {
- LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit
the job directly");
+ log.info("Unsupported running mode [{}], quit the job directly",
cfg.runningMode);
return -1;
}
}
@@ -235,7 +235,7 @@ public class HoodieCompactor {
}
private Integer doScheduleAndCompact(JavaSparkContext jsc) throws Exception {
- LOG.info("Step 1: Do schedule");
+ log.info("Step 1: Do schedule");
metaClient = HoodieTableMetaClient.reload(metaClient);
Option<String> instantTime = Option.empty();
@@ -243,20 +243,20 @@ public class HoodieCompactor {
Option<HoodieInstant> staleInstant =
TableServiceUtils.findStaleInflightInstant(
metaClient, HoodieTimeline.COMPACTION_ACTION,
cfg.maxProcessingTimeMs);
if (staleInstant.isPresent()) {
- LOG.info("Found failed compaction instant at : " + staleInstant.get()
+ "; Will rollback the failed compaction and re-trigger again.");
+ log.info("Found failed compaction instant at : {}; Will rollback the
failed compaction and re-trigger again.", staleInstant.get());
instantTime = Option.of(staleInstant.get().requestedTime());
}
}
instantTime = instantTime.isPresent() ? instantTime : doSchedule(jsc);
if (!instantTime.isPresent()) {
- LOG.error("Couldn't do schedule");
+ log.error("Couldn't do schedule");
return -1;
}
cfg.compactionInstantTime = instantTime.get();
- LOG.info("The schedule instant time is {}", instantTime.get());
- LOG.info("Step 2: Do compaction");
+ log.info("The schedule instant time is {}", instantTime.get());
+ log.info("Step 2: Do compaction");
return doCompact(jsc);
}
@@ -278,10 +278,10 @@ public class HoodieCompactor {
} else {
schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
}
- LOG.info("Schema --> : " + schemaStr);
+ log.info("Schema --> : {}", schemaStr);
try (SparkRDDWriteClient<HoodieRecordPayload> client =
- UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr,
cfg.parallelism, Option.empty(), props)) {
+ UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr,
cfg.parallelism, Option.empty(), props)) {
// If no compaction instant is provided by --instant-time, find the
earliest scheduled compaction
// instant from the active timeline
if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) {
@@ -289,10 +289,9 @@ public class HoodieCompactor {
Option<HoodieInstant> firstCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
if (firstCompactionInstant.isPresent()) {
cfg.compactionInstantTime =
firstCompactionInstant.get().requestedTime();
- LOG.info("Found the earliest scheduled compaction instant which will
be executed: "
- + cfg.compactionInstantTime);
+ log.info("Found the earliest scheduled compaction instant which will
be executed: {}", cfg.compactionInstantTime);
} else {
- LOG.info("There is no scheduled compaction in the table.");
+ log.info("There is no scheduled compaction in the table.");
return 0;
}
}
@@ -305,7 +304,7 @@ public class HoodieCompactor {
private Option<String> doSchedule(JavaSparkContext jsc) {
try (SparkRDDWriteClient client =
- UtilHelpers.createHoodieClient(jsc, cfg.basePath, "",
cfg.parallelism, Option.of(cfg.strategyClassName), props)) {
+ UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism,
Option.of(cfg.strategyClassName), props)) {
return client.scheduleCompaction(Option.empty());
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java
index 521f94bbafc5..f34fe8650401 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java
@@ -38,10 +38,9 @@ import org.apache.hudi.table.repair.RepairUtils;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList;
@@ -98,10 +97,10 @@ import static
org.apache.hudi.common.table.timeline.InstantComparison.compareTim
* --min-validate-interval-seconds 60
* ```
*/
+@Slf4j
public class HoodieDataTableValidator implements Serializable {
private static final long serialVersionUID = 1L;
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieDataTableValidator.class);
// Spark context
private transient JavaSparkContext jsc;
@@ -253,7 +252,7 @@ public class HoodieDataTableValidator implements
Serializable {
try {
validator.run();
} catch (Throwable throwable) {
- LOG.error("Fail to do hoodie Data table validation for " +
validator.cfg, throwable);
+ log.error("Fail to do hoodie Data table validation for " +
validator.cfg, throwable);
} finally {
jsc.stop();
}
@@ -261,12 +260,12 @@ public class HoodieDataTableValidator implements
Serializable {
public void run() {
try {
- LOG.info(cfg.toString());
+ log.info(cfg.toString());
if (cfg.continuous) {
- LOG.info(" ****** do hoodie data table validation in CONTINUOUS mode
******");
+ log.info(" ****** do hoodie data table validation in CONTINUOUS mode
******");
doHoodieDataTableValidationContinuous();
} else {
- LOG.info(" ****** do hoodie data table validation once ******");
+ log.info(" ****** do hoodie data table validation once ******");
doHoodieDataTableValidationOnce();
}
} catch (Exception e) {
@@ -283,7 +282,7 @@ public class HoodieDataTableValidator implements
Serializable {
try {
doDataTableValidation();
} catch (HoodieValidationException e) {
- LOG.error("Metadata table validation failed to
HoodieValidationException", e);
+ log.error("Metadata table validation failed to
HoodieValidationException", e);
if (!cfg.ignoreFailed) {
throw e;
}
@@ -320,9 +319,8 @@ public class HoodieDataTableValidator implements
Serializable {
}).collect(Collectors.toList());
if (!danglingFilePaths.isEmpty() && danglingFilePaths.size() > 0) {
- LOG.error("Data table validation failed due to dangling files count "
- + danglingFilePaths.size() + ", found before active timeline");
- danglingFilePaths.forEach(entry -> LOG.error("Dangling file: " +
entry.toString()));
+ log.error("Data table validation failed due to dangling files count
{}, found before active timeline", danglingFilePaths.size());
+ danglingFilePaths.forEach(entry -> log.error("Dangling file: " +
entry.toString()));
finalResult = false;
if (!cfg.ignoreFailed) {
throw new HoodieValidationException(
@@ -354,8 +352,8 @@ public class HoodieDataTableValidator implements
Serializable {
}, hoodieInstants.size()).stream().collect(Collectors.toList());
if (!danglingFiles.isEmpty()) {
- LOG.error("Data table validation failed due to extra files found for
completed commits {}", danglingFiles.size());
- danglingFiles.forEach(entry -> LOG.error("Dangling file: {}",
entry));
+ log.error("Data table validation failed due to extra files found for
completed commits {}", danglingFiles.size());
+ danglingFiles.forEach(entry -> log.error("Dangling file: {}",
entry));
finalResult = false;
if (!cfg.ignoreFailed) {
throw new HoodieValidationException("Data table validation failed
due to dangling files " + danglingFiles.size());
@@ -363,16 +361,16 @@ public class HoodieDataTableValidator implements
Serializable {
}
}
} catch (Exception e) {
- LOG.error("Data table validation failed", e);
+ log.error("Data table validation failed", e);
if (!cfg.ignoreFailed) {
throw new HoodieValidationException("Data table validation failed due
to " + e.getMessage(), e);
}
}
if (finalResult) {
- LOG.info("Data table validation succeeded.");
+ log.info("Data table validation succeeded.");
} else {
- LOG.error("Data table validation failed.");
+ log.error("Data table validation failed.");
}
}
@@ -389,12 +387,12 @@ public class HoodieDataTableValidator implements
Serializable {
long toSleepMs = cfg.minValidateIntervalSeconds * 1000 -
(System.currentTimeMillis() - start);
if (toSleepMs > 0) {
- LOG.info("Last validate ran less than min validate interval: " +
cfg.minValidateIntervalSeconds + " s, sleep: "
- + toSleepMs + " ms.");
+ log.info("Last validate ran less than min validate interval: {}
s, sleep: {} ms.",
+ cfg.minValidateIntervalSeconds, toSleepMs);
Thread.sleep(toSleepMs);
}
} catch (HoodieValidationException e) {
- LOG.error("Shutting down AsyncDataTableValidateService due to
HoodieValidationException", e);
+ log.error("Shutting down AsyncDataTableValidateService due to
HoodieValidationException", e);
if (!cfg.ignoreFailed) {
throw e;
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
index 9e2e7d035b9a..b9dadae7d87a 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
@@ -38,13 +38,12 @@ import org.apache.hudi.table.HoodieSparkTable;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList;
@@ -101,10 +100,10 @@ import scala.Tuple2;
*
* Also you can use --help to find more configs to use.
*/
+@Slf4j
public class HoodieDropPartitionsTool implements Serializable {
private static final long serialVersionUID = 1L;
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieDropPartitionsTool.class);
// Spark context
private final transient JavaSparkContext jsc;
// config
@@ -282,7 +281,7 @@ public class HoodieDropPartitionsTool implements
Serializable {
try {
tool.run();
} catch (Throwable throwable) {
- LOG.error("Fail to run deleting table partitions for " + cfg, throwable);
+ log.error("Fail to run deleting table partitions for {}", cfg,
throwable);
} finally {
jsc.stop();
}
@@ -290,21 +289,21 @@ public class HoodieDropPartitionsTool implements
Serializable {
public void run() {
try {
- LOG.info(cfg.toString());
+ log.info(cfg.toString());
Mode mode = Mode.valueOf(cfg.runningMode.toUpperCase());
switch (mode) {
case DELETE:
- LOG.info(" ****** The Hoodie Drop Partitions Tool is in delete mode
****** ");
+ log.info(" ****** The Hoodie Drop Partitions Tool is in delete mode
****** ");
doDeleteTablePartitions();
syncToHiveIfNecessary();
break;
case DRY_RUN:
- LOG.info(" ****** The Hoodie Drop Partitions Tool is in dry-run mode
****** ");
+ log.info(" ****** The Hoodie Drop Partitions Tool is in dry-run mode
****** ");
dryRun();
break;
default:
- LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit
the job directly");
+ log.info("Unsupported running mode [{}], quit the job directly",
cfg.runningMode);
}
} catch (Exception e) {
throw new HoodieException("Unable to delete table partitions in " +
cfg.basePath, e);
@@ -368,19 +367,17 @@ public class HoodieDropPartitionsTool implements
Serializable {
}
private void syncHive(HiveSyncConfig hiveSyncConfig) {
- LOG.info("Syncing target hoodie table with hive table("
- +
hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME)
- + "). Hive metastore URL :"
- + hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_URL)
- + ", basePath :" + cfg.basePath);
- LOG.info("Hive Sync Conf => " + hiveSyncConfig);
+ log.info("Syncing target hoodie table with hive table({}). Hive metastore
URL :{}, basePath :{}",
+
hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
+ hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_URL),
cfg.basePath);
+ log.info("Hive Sync Conf => {}", hiveSyncConfig);
FileSystem fs = HadoopFSUtils.getFs(cfg.basePath,
jsc.hadoopConfiguration());
HiveConf hiveConf = new HiveConf();
if (!StringUtils.isNullOrEmpty(cfg.hiveHMSUris)) {
hiveConf.set("hive.metastore.uris", cfg.hiveHMSUris);
}
hiveConf.addResource(fs.getConf());
- LOG.info("Hive Conf => " + hiveConf.getAllProperties().toString());
+ log.info("Hive Conf => {}", hiveConf.getAllProperties().toString());
try (HiveSyncTool hiveSyncTool = new
HiveSyncTool(hiveSyncConfig.getProps(), hiveConf)) {
hiveSyncTool.syncHoodieTable();
}
@@ -392,9 +389,9 @@ public class HoodieDropPartitionsTool implements
Serializable {
* @param partitionToReplaceFileIds
*/
private void printDeleteFilesInfo(Map<String, List<String>>
partitionToReplaceFileIds) {
- LOG.info("Data files and partitions to delete : ");
+ log.info("Data files and partitions to delete : ");
for (Map.Entry<String, List<String>> entry :
partitionToReplaceFileIds.entrySet()) {
- LOG.info(String.format("Partitions : %s, corresponding data file IDs :
%s", entry.getKey(), entry.getValue()));
+ log.info("Partitions : {}, corresponding data file IDs : {}",
entry.getKey(), entry.getValue());
}
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
index b3d743693eec..2c7bab98900f 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
@@ -35,10 +35,9 @@ import org.apache.hudi.metadata.MetadataPartitionType;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList;
@@ -87,9 +86,9 @@ import static
org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
* hoodie.write.concurrency.mode=optimistic_concurrency_control
*
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
*/
+@Slf4j
public class HoodieIndexer {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieIndexer.class);
static final String DROP_INDEX = "dropindex";
private final HoodieIndexer.Config cfg;
@@ -165,21 +164,21 @@ public class HoodieIndexer {
if (result != 0) {
throw new HoodieException(resultMsg + " failed");
}
- LOG.info(resultMsg + " success");
+ log.info("{} success", resultMsg);
jsc.stop();
}
public int start(int retry) {
// indexing should be done only if metadata is enabled
if (!props.getBoolean(HoodieMetadataConfig.ENABLE.key())) {
- LOG.error(String.format("Metadata is not enabled. Please set %s to
true.", HoodieMetadataConfig.ENABLE.key()));
+ log.error("Metadata is not enabled. Please set {} to true.",
HoodieMetadataConfig.ENABLE.key());
return -1;
}
// all inflight or completed metadata partitions have already been
initialized
// so enable corresponding indexes in the props so that they're not deleted
Set<String> initializedMetadataPartitions =
getInflightAndCompletedMetadataPartitions(metaClient.getTableConfig());
- LOG.info("Setting props for: " + initializedMetadataPartitions);
+ log.info("Setting props for: {}", initializedMetadataPartitions);
initializedMetadataPartitions.forEach(p -> {
if (PARTITION_NAME_COLUMN_STATS.equals(p)) {
props.setProperty(ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "true");
@@ -195,28 +194,28 @@ public class HoodieIndexer {
return UtilHelpers.retry(retry, () -> {
switch (cfg.runningMode.toLowerCase()) {
case SCHEDULE: {
- LOG.info("Running Mode: [" + SCHEDULE + "]; Do schedule");
+ log.info("Running Mode: [{}]; Do schedule", SCHEDULE);
Option<String> instantTime = scheduleIndexing(jsc);
int result = instantTime.isPresent() ? 0 : -1;
if (result == 0) {
- LOG.info("The schedule instant time is " + instantTime.get());
+ log.info("The schedule instant time is {}", instantTime.get());
}
return result;
}
case SCHEDULE_AND_EXECUTE: {
- LOG.info("Running Mode: [" + SCHEDULE_AND_EXECUTE + "]");
+ log.info("Running Mode: [{}]", SCHEDULE_AND_EXECUTE);
return scheduleAndRunIndexing(jsc);
}
case EXECUTE: {
- LOG.info("Running Mode: [" + EXECUTE + "];");
+ log.info("Running Mode: [{}];", EXECUTE);
return runIndexing(jsc);
}
case DROP_INDEX: {
- LOG.info("Running Mode: [" + DROP_INDEX + "];");
+ log.info("Running Mode: [{}];", DROP_INDEX);
return dropIndex(jsc);
}
default: {
- LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit
the job directly");
+ log.info("Unsupported running mode [{}], quit the job directly",
cfg.runningMode);
return -1;
}
}
@@ -248,7 +247,7 @@ public class HoodieIndexer {
Option<String> indexingInstant = client.scheduleIndexing(partitionTypes,
Collections.emptyList());
if (!indexingInstant.isPresent()) {
- LOG.error("Scheduling of index action did not return any instant.");
+ log.error("Scheduling of index action did not return any instant.");
}
return indexingInstant;
}
@@ -264,7 +263,7 @@ public class HoodieIndexer {
Set<String> requestedIndexPartitionPaths =
partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
requestedIndexPartitionPaths.retainAll(indexedMetadataPartitions);
if (!requestedIndexPartitionPaths.isEmpty()) {
- LOG.error("Following indexes already built: " +
requestedIndexPartitionPaths);
+ log.error("Following indexes already built: {}",
requestedIndexPartitionPaths);
return true;
}
return false;
@@ -286,8 +285,7 @@ public class HoodieIndexer {
.firstInstant();
if (earliestPendingIndexInstant.isPresent()) {
cfg.indexInstantTime =
earliestPendingIndexInstant.get().requestedTime();
- LOG.info("Found the earliest scheduled indexing instant which will
be executed: "
- + cfg.indexInstantTime);
+ log.info("Found the earliest scheduled indexing instant which will
be executed: {}", cfg.indexInstantTime);
} else {
throw new HoodieIndexException("There is no scheduled indexing in
the table.");
}
@@ -316,18 +314,18 @@ public class HoodieIndexer {
client.dropIndex(partitionTypes);
return 0;
} catch (Exception e) {
- LOG.error("Failed to drop index. ", e);
+ log.error("Failed to drop index. ", e);
return -1;
}
}
private boolean handleResponse(Option<HoodieIndexCommitMetadata>
commitMetadata) {
if (!commitMetadata.isPresent()) {
- LOG.error("Indexing failed as no commit metadata present.");
+ log.error("Indexing failed as no commit metadata present.");
return false;
}
List<HoodieIndexPartitionInfo> indexPartitionInfos =
commitMetadata.get().getIndexPartitionInfos();
- LOG.info("Indexing complete for partitions: {}",
indexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toList()));
+ log.info("Indexing complete for partitions: {}",
indexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toList()));
return isIndexBuiltForAllRequestedTypes(indexPartitionInfos);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index daf9370c3550..d6876bea6e69 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -97,6 +97,8 @@ import org.apache.hudi.utilities.util.BloomFilterData;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaPairRDD;
@@ -105,8 +107,6 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.sql.functions;
import org.apache.spark.storage.StorageLevel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
@@ -204,10 +204,10 @@ import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getLogFileColumnR
* --min-validate-interval-seconds 60
* ```
*/
+@Slf4j
public class HoodieMetadataTableValidator implements Serializable {
private static final long serialVersionUID = 1L;
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieMetadataTableValidator.class);
// Spark context
private transient JavaSparkContext jsc;
@@ -222,6 +222,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
private final String taskLabels;
+ @Getter
private final List<Throwable> throwables = new ArrayList<>();
public HoodieMetadataTableValidator(JavaSparkContext jsc, Config cfg) {
@@ -240,21 +241,13 @@ public class HoodieMetadataTableValidator implements
Serializable {
.build());
} catch (TableNotFoundException tbe) {
// Suppress the TableNotFound exception, table not yet created for a new
stream
- LOG.warn("Data table is not found. Skip current validation for: {}",
cfg.basePath);
+ log.warn("Data table is not found. Skip current validation for: {}",
cfg.basePath);
}
this.asyncMetadataTableValidateService = cfg.continuous ? Option.of(new
AsyncMetadataTableValidateService()) : Option.empty();
this.taskLabels = generateValidationTaskLabels();
}
- /**
- * Returns list of Throwable which were encountered during validation. This
method is useful
- * when ignoreFailed parameter is set to true.
- */
- public List<Throwable> getThrowables() {
- return throwables;
- }
-
/**
* Returns true if there is a validation failure encountered during
validation.
* This method is useful when ignoreFailed parameter is set to true.
@@ -514,7 +507,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
HoodieMetadataTableValidator validator = new
HoodieMetadataTableValidator(jsc, cfg);
validator.run();
} catch (Throwable throwable) {
- LOG.error("Fail to do hoodie metadata table validation for {}", cfg,
throwable);
+ log.error("Fail to do hoodie metadata table validation for {}", cfg,
throwable);
} finally {
jsc.stop();
}
@@ -522,17 +515,17 @@ public class HoodieMetadataTableValidator implements
Serializable {
public boolean run() {
if (!metaClientOpt.isPresent()) {
- LOG.warn("Data table is not available to read for now, skip current
validation for: {}", cfg.basePath);
+ log.warn("Data table is not available to read for now, skip current
validation for: {}", cfg.basePath);
return true;
}
boolean result = false;
try {
- LOG.info(cfg.toString());
+ log.info(cfg.toString());
if (cfg.continuous) {
- LOG.info(" ****** do hoodie metadata table validation in CONTINUOUS
mode - {} ******", taskLabels);
+ log.info(" ****** do hoodie metadata table validation in CONTINUOUS
mode - {} ******", taskLabels);
doHoodieMetadataTableValidationContinuous();
} else {
- LOG.info(" ****** do hoodie metadata table validation once - {}
******", taskLabels);
+ log.info(" ****** do hoodie metadata table validation once - {}
******", taskLabels);
result = doHoodieMetadataTableValidationOnce();
}
return result;
@@ -554,7 +547,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
try {
return doMetadataTableValidation();
} catch (Throwable e) {
- LOG.error("Metadata table validation failed to HoodieValidationException
{}", taskLabels, e);
+ log.error("Metadata table validation failed to HoodieValidationException
{}", taskLabels, e);
if (!cfg.ignoreFailed) {
throw e;
}
@@ -577,7 +570,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
public boolean doMetadataTableValidation() {
boolean finalResult = true;
if (!metaClientOpt.isPresent()) {
- LOG.warn("Data table is not available to read for now, skip current
validation.");
+ log.warn("Data table is not available to read for now, skip current
validation.");
return true;
}
HoodieTableMetaClient metaClient = this.metaClientOpt.get();
@@ -619,7 +612,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
List<String> allPartitions = validatePartitions(engineContext, basePath,
metaClient);
if (allPartitions.isEmpty()) {
- LOG.warn("The result of getting all partitions is null or empty, skip
current validation. {}", taskLabels);
+ log.warn("The result of getting all partitions is null or empty, skip
current validation. {}", taskLabels);
return true;
}
@@ -632,10 +625,10 @@ public class HoodieMetadataTableValidator implements
Serializable {
engineContext.parallelize(allPartitions,
allPartitions.size()).map(partitionPath -> {
try {
validateFilesInPartition(metadataTableBasedContext,
fsBasedContext, partitionPath, finalBaseFilesForCleaning);
- LOG.info("Metadata table validation succeeded for partition {}
(partition {})", partitionPath, taskLabels);
+ log.info("Metadata table validation succeeded for partition {}
(partition {})", partitionPath, taskLabels);
return Pair.<Boolean, HoodieValidationException>of(true, null);
} catch (HoodieValidationException e) {
- LOG.error("Metadata table validation failed for partition {} due
to HoodieValidationException (partition {})",
+ log.error("Metadata table validation failed for partition {} due
to HoodieValidationException (partition {})",
partitionPath, taskLabels, e);
if (!cfg.ignoreFailed) {
throw e;
@@ -672,7 +665,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
for (Pair<Boolean, HoodieValidationException> res : result) {
finalResult &= res.getKey();
if (res.getKey().equals(false)) {
- LOG.error("Metadata Validation failed for table: {}", cfg.basePath,
res.getValue());
+ log.error("Metadata Validation failed for table: {}", cfg.basePath,
res.getValue());
if (res.getRight() != null) {
throwables.add(res.getRight());
}
@@ -680,10 +673,10 @@ public class HoodieMetadataTableValidator implements
Serializable {
}
if (finalResult) {
- LOG.info("Metadata table validation succeeded ({}).", taskLabels);
+ log.info("Metadata table validation succeeded ({}).", taskLabels);
return true;
} else {
- LOG.error("Metadata table validation failed ({}).", taskLabels);
+ log.error("Metadata table validation failed ({}).", taskLabels);
return false;
}
} catch (HoodieValidationException validationException) {
@@ -697,14 +690,14 @@ public class HoodieMetadataTableValidator implements
Serializable {
throw new HoodieValidationException("Unexpected spark failure",
sparkException);
}
} catch (Exception e) {
- LOG.warn("Error closing HoodieMetadataValidationContext, "
+ log.warn("Error closing HoodieMetadataValidationContext, "
+ "ignoring the error as the validation is successful.", e);
return true;
}
}
private void handleValidationException(HoodieValidationException e,
List<Pair<Boolean, HoodieValidationException>> result, String errorMsg) {
- LOG.error("{} for table: {} ", errorMsg, cfg.basePath, e);
+ log.error("{} for table: {} ", errorMsg, cfg.basePath, e);
if (!cfg.ignoreFailed) {
throw e;
}
@@ -725,7 +718,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
int finishedInstants =
mdtMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants();
if (finishedInstants == 0) {
if
(metaClientOpt.get().getCommitsTimeline().filterCompletedInstants().countInstants()
== 0) {
- LOG.info("There is no completed commit in both metadata table and
corresponding data table: {}", taskLabels);
+ log.info("There is no completed commit in both metadata table and
corresponding data table: {}", taskLabels);
return false;
} else {
throw new HoodieValidationException("There is no completed instant
for metadata table: " + cfg.basePath);
@@ -734,10 +727,10 @@ public class HoodieMetadataTableValidator implements
Serializable {
return true;
} catch (TableNotFoundException tbe) {
// Suppress the TableNotFound exception if Metadata table is not
available to read for now
- LOG.warn("Metadata table is not found for table: {}. Skip current
validation.", cfg.basePath);
+ log.warn("Metadata table is not found for table: {}. Skip current
validation.", cfg.basePath);
return false;
} catch (Exception ex) {
- LOG.warn("Metadata table is not available to read for now for table: {},
", cfg.basePath, ex);
+ log.warn("Metadata table is not available to read for now for table: {},
", cfg.basePath, ex);
return false;
}
}
@@ -776,7 +769,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
Option<HoodieInstant> lastInstant =
completedTimeline.lastInstant();
if (lastInstant.isPresent()
&&
InstantComparison.compareTimestamps(partitionCreationTimeOpt.get(),
GREATER_THAN, lastInstant.get().requestedTime())) {
- LOG.info("Ignoring additional partition {}, as it was
deduced to be part of a "
+ log.info("Ignoring additional partition {}, as it was
deduced to be part of a "
+ "latest completed commit which was inflight when FS
based listing was polled.", partitionFromMDT);
actualAdditionalPartitionsInMDT.remove(partitionFromMDT);
}
@@ -804,7 +797,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
}).collect(Collectors.toList());
additionalFromFS.removeAll(emptyPartitions);
if (additionalFromFS.isEmpty()) {
- LOG.info("All out of sync partitions turned out to be empty {}",
emptyPartitions);
+ log.info("All out of sync partitions turned out to be empty {}",
emptyPartitions);
misMatch.set(false);
} else {
misMatch.set(true);
@@ -819,7 +812,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
+ toStringWithThreshold(actualAdditionalPartitionsInMDT,
cfg.logDetailMaxLength)
+ "\".\n All " + allPartitionPathsFromFS.size() + " partitions
from FS listing "
+ toStringWithThreshold(allPartitionPathsFromFS,
cfg.logDetailMaxLength);
- LOG.error(message);
+ log.error(message);
throw new HoodieValidationException(message);
}
}
@@ -1252,10 +1245,10 @@ public class HoodieMetadataTableValidator implements
Serializable {
if (countKeyFromTable != countKeyFromRecordIndex) {
String message = String.format("Validation of record index count failed:
%s entries from record index metadata, %s keys from the data table: %s",
countKeyFromRecordIndex, countKeyFromTable, cfg.basePath);
- LOG.error(message);
+ log.error(message);
throw new HoodieValidationException(message);
} else {
- LOG.info("Validation of record index count succeeded: {} entries. Table:
{}", countKeyFromRecordIndex, cfg.basePath);
+ log.info("Validation of record index count succeeded: {} entries. Table:
{}", countKeyFromRecordIndex, cfg.basePath);
}
}
@@ -1338,10 +1331,10 @@ public class HoodieMetadataTableValidator implements
Serializable {
+ "%s keys (total %s) from the data table have wrong location in
record index "
+ "metadata. Table: %s Sample mismatches: %s",
diffCount, countKey, cfg.basePath, String.join(";",
result.getRight()));
- LOG.error(message);
+ log.error(message);
throw new HoodieValidationException(message);
} else {
- LOG.info("Validation of record index content succeeded: {} entries.
Table: {}", countKey, cfg.basePath);
+ log.info("Validation of record index content succeeded: {} entries.
Table: {}", countKey, cfg.basePath);
}
}
@@ -1469,10 +1462,10 @@ public class HoodieMetadataTableValidator implements
Serializable {
if (mismatch) {
String message = String.format("Validation of %s for partition %s failed
for table: %s. %s",
label, partitionPath, cfg.basePath, errorDetails);
- LOG.error(message);
+ log.error(message);
throw new HoodieValidationException(message);
} else {
- LOG.info("Validation of {} succeeded for partition {} for table: {}",
label, partitionPath, cfg.basePath);
+ log.info("Validation of {} succeeded for partition {} for table: {}",
label, partitionPath, cfg.basePath);
}
}
@@ -1522,7 +1515,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
mismatch = true;
break;
} else {
- LOG.info("There are uncommitted log files in the latest file slices
but the committed log files match: {} {}", fileSlice1, fileSlice2);
+ log.info("There are uncommitted log files in the latest file slices
but the committed log files match: {} {}", fileSlice1, fileSlice2);
}
}
}
@@ -1530,10 +1523,10 @@ public class HoodieMetadataTableValidator implements
Serializable {
if (mismatch) {
String message = String.format("Validation of %s for partition %s failed
for table: %s. %s",
label, partitionPath, cfg.basePath, errorDetails);
- LOG.error(message);
+ log.error(message);
throw new HoodieValidationException(message);
} else {
- LOG.info("Validation of {} succeeded for partition {} for table: {}",
label, partitionPath, cfg.basePath);
+ log.info("Validation of {} succeeded for partition {} for table: {}",
label, partitionPath, cfg.basePath);
}
}
@@ -1565,16 +1558,16 @@ public class HoodieMetadataTableValidator implements
Serializable {
// truncate start instant since range is not
Set<String> missingCommits =
nonActiveInstantTimes.stream().filter(instant ->
!archivedInstants.contains(instant)).collect(Collectors.toSet());
if (!missingCommits.isEmpty()) {
- LOG.warn("File slices in file system belong to missing commits: {}",
String.join(",", missingCommits));
+ log.warn("File slices in file system belong to missing commits: {}",
String.join(",", missingCommits));
activeTimeline.getRollbackTimeline().getInstantsAsStream().forEach(instant -> {
HoodieInstant requestedInstant =
metaClient.getInstantGenerator().getRollbackRequestedInstant(instant);
try {
HoodieRollbackPlan rollbackPlan =
activeTimeline.readInstantContent(requestedInstant, HoodieRollbackPlan.class);
if
(missingCommits.contains(rollbackPlan.getInstantToRollback().getCommitTime())) {
- LOG.warn("Missing commit ({}) is part of rollback plan: {}",
rollbackPlan.getInstantToRollback().getCommitTime(), rollbackPlan);
+ log.warn("Missing commit ({}) is part of rollback plan: {}",
rollbackPlan.getInstantToRollback().getCommitTime(), rollbackPlan);
}
} catch (IOException ex) {
- LOG.warn("Failed to deserialize rollback plan for instant: {}",
requestedInstant, ex);
+ log.warn("Failed to deserialize rollback plan for instant: {}",
requestedInstant, ex);
}
});
}
@@ -1663,7 +1656,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
try {
HoodieSchema readerSchema =
TableSchemaResolver.readSchemaFromLogFile(storage, new
StoragePath(logFilePathStr));
if (readerSchema == null) {
- LOG.warn("Cannot read schema from log file {}. Skip the check as
it's likely being written by an inflight instant.", logFilePathStr);
+ log.warn("Cannot read schema from log file {}. Skip the check as
it's likely being written by an inflight instant.", logFilePathStr);
continue;
}
reader =
@@ -1700,7 +1693,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
"Log file is committed in an instant in active timeline:
instantTime=%s %s",
instantTime, logFilePathStr));
} else {
- LOG.warn("Log file is uncommitted in a completed instant, likely
due to retry: instantTime={} {}", instantTime, logFilePathStr);
+ log.warn("Log file is uncommitted in a completed instant, likely
due to retry: instantTime={} {}", instantTime, logFilePathStr);
}
} else if
(completedInstantsTimeline.isBeforeTimelineStarts(instantTime)) {
// The instant is in archived timeline
@@ -1710,18 +1703,18 @@ public class HoodieMetadataTableValidator implements
Serializable {
} else if (inflightInstantsTimeline.containsInstant(instantTime)) {
// The instant is inflight in active timeline
// hit an uncommitted block possibly from a failed write
- LOG.warn("Log file is uncommitted because of an inflight instant:
instantTime={} {}", instantTime, logFilePathStr);
+ log.warn("Log file is uncommitted because of an inflight instant:
instantTime={} {}", instantTime, logFilePathStr);
} else {
// The instant is after the start of the active timeline,
// but it cannot be found in the active timeline
- LOG.warn("Log file is uncommitted because the instant is after the
start of the active timeline but absent or in requested in the active timeline:
instantTime={} {}",
+ log.warn("Log file is uncommitted because the instant is after the
start of the active timeline but absent or in requested in the active timeline:
instantTime={} {}",
instantTime, logFilePathStr);
}
} else {
- LOG.warn("There is no log block in {}", logFilePathStr);
+ log.warn("There is no log block in {}", logFilePathStr);
}
} catch (IOException e) {
- LOG.warn("Cannot read log file {}. Skip the check as it's likely being
written by an inflight instant.",
+ log.warn("Cannot read log file {}. Skip the check as it's likely being
written by an inflight instant.",
logFilePathStr, e);
} finally {
FileIOUtils.closeQuietly(reader);
@@ -1756,11 +1749,11 @@ public class HoodieMetadataTableValidator implements
Serializable {
long toSleepMs = cfg.minValidateIntervalSeconds * 1000 -
(System.currentTimeMillis() - start);
if (toSleepMs > 0) {
- LOG.info("Last validate ran less than min validate interval: {}
s, sleep: {} ms.", cfg.minValidateIntervalSeconds, toSleepMs);
+ log.info("Last validate ran less than min validate interval: {}
s, sleep: {} ms.", cfg.minValidateIntervalSeconds, toSleepMs);
Thread.sleep(toSleepMs);
}
} catch (HoodieValidationException e) {
- LOG.error("Shutting down AsyncMetadataTableValidateService due to
HoodieValidationException", e);
+ log.error("Shutting down AsyncMetadataTableValidateService due to
HoodieValidationException", e);
if (!cfg.ignoreFailed) {
throw e;
}
@@ -1816,15 +1809,18 @@ public class HoodieMetadataTableValidator implements
Serializable {
* the same information regardless of whether metadata table is enabled,
which is
* verified in the {@link HoodieMetadataTableValidator}.
*/
+ @Slf4j
private static class HoodieMetadataValidationContext implements
AutoCloseable, Serializable {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieMetadataValidationContext.class);
-
private final Properties props;
+ @Getter
private final HoodieTableMetaClient metaClient;
+ @Getter
private final HoodieMetadataConfig metadataConfig;
+ @Getter
private final HoodieSchema schema;
private final HoodieTableFileSystemView fileSystemView;
+ @Getter
private final HoodieTableMetadata tableMetadata;
private final boolean enableMetadataTable;
private List<String> allColumnNameList;
@@ -1865,10 +1861,10 @@ public class HoodieMetadataTableValidator implements
Serializable {
FileSystemViewStorageConfig viewConf, HoodieCommonConfig commonConfig) {
switch (viewConf.getStorageType()) {
case SPILLABLE_DISK:
- LOG.debug("Creating Spillable Disk based Table View");
+ log.debug("Creating Spillable Disk based Table View");
break;
case MEMORY:
- LOG.debug("Creating in-memory based Table View");
+ log.debug("Creating in-memory based Table View");
break;
default:
throw new HoodieException("Unsupported storage type " +
viewConf.getStorageType() + ", used with HoodieMetadataTableValidator");
@@ -1876,22 +1872,6 @@ public class HoodieMetadataTableValidator implements
Serializable {
return (HoodieTableFileSystemView)
FileSystemViewManager.createViewManager(context, metadataConfig, viewConf,
commonConfig, unused -> tableMetadata).getFileSystemView(metaClient);
}
- public HoodieTableMetaClient getMetaClient() {
- return metaClient;
- }
-
- public HoodieMetadataConfig getMetadataConfig() {
- return metadataConfig;
- }
-
- public HoodieSchema getSchema() {
- return schema;
- }
-
- public HoodieTableMetadata getTableMetadata() {
- return tableMetadata;
- }
-
public List<HoodieBaseFile> getSortedLatestBaseFileList(String
partitionPath) {
return fileSystemView.getLatestBaseFiles(partitionPath)
.sorted(new HoodieBaseFileComparator()).collect(Collectors.toList());
@@ -1909,7 +1889,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
@SuppressWarnings({"rawtypes", "unchecked"})
public List<HoodieColumnRangeMetadata<Comparable>>
getSortedColumnStatsList(String partitionPath, List<String> fileNames,
HoodieSchema readerSchema) {
- LOG.info("All column names for getting column stats: {}",
allColumnNameList);
+ log.info("All column names for getting column stats: {}",
allColumnNameList);
if (enableMetadataTable) {
List<Pair<String, String>> partitionFileNameList = fileNames.stream()
.map(filename -> Pair.of(partitionPath,
filename)).collect(Collectors.toList());
@@ -1993,11 +1973,11 @@ public class HoodieMetadataTableValidator implements
Serializable {
.getFileReader(new HoodieConfig(), path)) {
bloomFilter = fileReader.readBloomFilter();
if (bloomFilter == null) {
- LOG.error("Failed to read bloom filter for {}", path);
+ log.error("Failed to read bloom filter for {}", path);
return Option.empty();
}
} catch (IOException e) {
- LOG.error("Failed to get file reader for {} {}", path, e);
+ log.error("Failed to get file reader for {} {}", path, e);
return Option.empty();
}
return Option.of(BloomFilterData.builder()
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
index 11e4514d7d34..a3781999828f 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
@@ -41,11 +41,10 @@ import org.apache.hudi.table.repair.RepairUtils;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
@@ -139,9 +138,9 @@ import java.util.stream.Collectors;
* --backup-path backup_path
* ```
*/
+@Slf4j
public class HoodieRepairTool {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieRepairTool.class);
private static final String BACKUP_DIR_PREFIX = "hoodie_repair_backup_";
// Repair config
private final Config cfg;
@@ -175,41 +174,39 @@ public class HoodieRepairTool {
Option<String> endingInstantOption =
Option.ofNullable(cfg.endingInstantTime);
if (startingInstantOption.isPresent() && endingInstantOption.isPresent()) {
- LOG.info(String.format("Start repairing completed instants between %s
and %s (inclusive)",
- startingInstantOption.get(), endingInstantOption.get()));
+ log.info("Start repairing completed instants between {} and {}
(inclusive)",
+ startingInstantOption.get(), endingInstantOption.get());
} else if (startingInstantOption.isPresent()) {
- LOG.info(String.format("Start repairing completed instants from %s
(inclusive)",
- startingInstantOption.get()));
+ log.info("Start repairing completed instants from {} (inclusive)",
startingInstantOption.get());
} else if (endingInstantOption.isPresent()) {
- LOG.info(String.format("Start repairing completed instants till %s
(inclusive)",
- endingInstantOption.get()));
+ log.info("Start repairing completed instants till {} (inclusive)",
endingInstantOption.get());
} else {
- LOG.info("Start repairing all completed instants");
+ log.info("Start repairing all completed instants");
}
try {
Mode mode = Mode.valueOf(cfg.runningMode.toUpperCase());
switch (mode) {
case REPAIR:
- LOG.info(" ****** The repair tool is in REPAIR mode, dangling data
and logs files "
+ log.info(" ****** The repair tool is in REPAIR mode, dangling data
and logs files "
+ "not belonging to any commit are going to be DELETED from the
table ******");
if (checkBackupPathForRepair() < 0) {
- LOG.error("Backup path check failed.");
+ log.error("Backup path check failed.");
return false;
}
return doRepair(startingInstantOption, endingInstantOption, false);
case DRY_RUN:
- LOG.info(" ****** The repair tool is in DRY_RUN mode, "
+ log.info(" ****** The repair tool is in DRY_RUN mode, "
+ "only LOOKING FOR dangling data and log files from the table
******");
return doRepair(startingInstantOption, endingInstantOption, true);
case UNDO:
if (checkBackupPathAgainstBasePath() < 0) {
- LOG.error("Backup path check failed.");
+ log.error("Backup path check failed.");
return false;
}
return undoRepair();
default:
- LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit
the job directly");
+ log.info("Unsupported running mode [{}], quit the job directly",
cfg.runningMode);
return false;
}
} catch (IOException e) {
@@ -229,7 +226,7 @@ public class HoodieRepairTool {
try {
new HoodieRepairTool(jsc, cfg).run();
} catch (Throwable throwable) {
- LOG.error("Fail to run table repair for " + cfg.basePath, throwable);
+ log.error("Fail to run table repair for {}", cfg.basePath, throwable);
} finally {
jsc.stop();
}
@@ -264,8 +261,7 @@ public class HoodieRepairTool {
}
} catch (IOException e) {
// Copy Fail
- LOG.error(String.format("Copying file fails: source [%s],
destination [%s]",
- sourcePath, destPath));
+ log.error("Copying file fails: source [{}], destination [{}]",
sourcePath, destPath);
} finally {
results.add(success);
}
@@ -321,7 +317,7 @@ public class HoodieRepairTool {
try {
success = fs.delete(new Path(basePath, relativeFilePath), false);
} catch (IOException e) {
- LOG.error("Failed to delete file {}", relativeFilePath);
+ log.error("Failed to delete file {}", relativeFilePath);
} finally {
results.add(success);
}
@@ -378,12 +374,12 @@ public class HoodieRepairTool {
.collect(Collectors.toList());
if (relativeFilePathsToDelete.size() > 0) {
if (!backupFiles(relativeFilePathsToDelete)) {
- LOG.error("Error backing up dangling files. Exiting...");
+ log.error("Error backing up dangling files. Exiting...");
return false;
}
return deleteFiles(context, cfg.basePath, relativeFilePathsToDelete);
}
- LOG.info(String.format("Table repair on %s is successful",
cfg.basePath));
+ log.info("Table repair on {} is successful", cfg.basePath);
}
return true;
}
@@ -398,14 +394,14 @@ public class HoodieRepairTool {
String backupPathStr = cfg.backupPath;
StoragePath backupPath = new StoragePath(backupPathStr);
if (!storage.exists(backupPath)) {
- LOG.error("Cannot find backup path: " + backupPath);
+ log.error("Cannot find backup path: {}", backupPath);
return false;
}
List<String> allPartitionPaths = tableMetadata.getAllPartitionPaths();
if (allPartitionPaths.isEmpty()) {
- LOG.error("Cannot get one partition path since there is no partition
available");
+ log.error("Cannot get one partition path since there is no partition
available");
return false;
}
@@ -446,7 +442,7 @@ public class HoodieRepairTool {
StoragePath backupPath = new StoragePath(cfg.backupPath);
if (metaClient.getStorage().exists(backupPath)
&& metaClient.getStorage().listDirectEntries(backupPath).size() > 0) {
- LOG.error(String.format("Cannot use backup path %s: it is not empty",
cfg.backupPath));
+ log.error("Cannot use backup path {}: it is not empty", cfg.backupPath);
return -1;
}
@@ -461,13 +457,12 @@ public class HoodieRepairTool {
*/
int checkBackupPathAgainstBasePath() {
if (cfg.backupPath == null) {
- LOG.error("Backup path is not configured");
+ log.error("Backup path is not configured");
return -1;
}
if (cfg.backupPath.contains(cfg.basePath)) {
- LOG.error(String.format("Cannot use backup path %s: it resides in the
base path %s",
- cfg.backupPath, cfg.basePath));
+ log.error("Cannot use backup path {}: it resides in the base path {}",
cfg.backupPath, cfg.basePath);
return -1;
}
return 0;
@@ -502,11 +497,11 @@ public class HoodieRepairTool {
private void printRepairInfo(
List<String> instantTimesToRepair, List<ImmutablePair<String,
List<String>>> instantsWithDanglingFiles) {
int numInstantsToRepair = instantsWithDanglingFiles.size();
- LOG.info("Number of instants verified based on the base and log files:
{}", instantTimesToRepair.size());
- LOG.info("Instant timestamps: {}", instantTimesToRepair);
- LOG.info("Number of instants to repair: {}", numInstantsToRepair);
+ log.info("Number of instants verified based on the base and log files:
{}", instantTimesToRepair.size());
+ log.info("Instant timestamps: {}", instantTimesToRepair);
+ log.info("Number of instants to repair: {}", numInstantsToRepair);
if (numInstantsToRepair > 0) {
- instantsWithDanglingFiles.forEach(e -> LOG.info(" ** Removing files:
{}", e.getValue()));
+ instantsWithDanglingFiles.forEach(e -> log.info(" ** Removing files:
{}", e.getValue()));
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
index 38794c5345fd..78e72ee31c3b 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
@@ -49,6 +49,7 @@ import com.beust.jcommander.IValueValidator;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -62,8 +63,6 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
@@ -79,6 +78,7 @@ import static
org.apache.hudi.common.table.timeline.InstantComparison.compareTim
/**
* Export the latest records of Hudi dataset to a set of external files (e.g.,
plain parquet files).
*/
+@Slf4j
public class HoodieSnapshotExporter {
@FunctionalInterface
@@ -88,8 +88,6 @@ public class HoodieSnapshotExporter {
}
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieSnapshotExporter.class);
-
public static class OutputFormatValidator implements IValueValidator<String>
{
public static final String HUDI = "hudi";
@@ -159,8 +157,7 @@ public class HoodieSnapshotExporter {
.<HoodieSnapshotExporterException>orElseThrow(() -> {
throw new HoodieSnapshotExporterException("No commits present.
Nothing to snapshot.");
});
- LOG.info(String.format("Starting to snapshot latest version files which
are also no-late-than %s.",
- latestCommitTimestamp));
+ log.info("Starting to snapshot latest version files which are also
no-late-than {}.", latestCommitTimestamp);
final HoodieSparkEngineContext engineContext = new
HoodieSparkEngineContext(jsc);
final List<String> partitions = getPartitions(engineContext, cfg,
HoodieStorageUtils.getStorage(
@@ -169,7 +166,7 @@ public class HoodieSnapshotExporter {
if (partitions.isEmpty()) {
throw new HoodieSnapshotExporterException("The source dataset has 0
partition to snapshot.");
}
- LOG.info(String.format("The job needs to export %d partitions.",
partitions.size()));
+ log.info("The job needs to export {} partitions.", partitions.size());
if (cfg.outputFormat.equals(OutputFormatValidator.HUDI)) {
exportAsHudi(jsc, sourceFs, cfg, partitions, latestCommitTimestamp,
tableMetadata);
@@ -194,7 +191,7 @@ public class HoodieSnapshotExporter {
private void createSuccessTag(FileSystem fs, Config cfg) throws IOException {
Path successTagPath = new Path(cfg.targetOutputPath + "/_SUCCESS");
if (!fs.exists(successTagPath)) {
- LOG.info(String.format("Creating _SUCCESS under target output path: %s",
cfg.targetOutputPath));
+ log.info("Creating _SUCCESS under target output path: {}",
cfg.targetOutputPath);
fs.createNewFile(successTagPath);
}
}
@@ -285,7 +282,7 @@ public class HoodieSnapshotExporter {
}, parallelism);
// Also copy the .commit files
- LOG.info(String.format("Copying .commit files which are no-late-than %s.",
latestCommitTimestamp));
+ log.info("Copying .commit files which are no-late-than {}.",
latestCommitTimestamp);
List<FileStatus> commitFilesListToCopy =
Arrays.stream(sourceFs.listStatus(new Path(cfg.sourceBasePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/" +
HoodieTableMetaClient.TIMELINEFOLDER_NAME)))
.filter(fileStatus -> {
@@ -344,7 +341,7 @@ public class HoodieSnapshotExporter {
}
JavaSparkContext jsc =
UtilHelpers.buildSparkContext("Hoodie-snapshot-exporter", "local[*]",
cfg.enableHiveSupport);
- LOG.info("Initializing spark job.");
+ log.info("Initializing spark job.");
try {
new HoodieSnapshotExporter().export(jsc, cfg);
@@ -359,13 +356,13 @@ public class HoodieSnapshotExporter {
switch (config.transformerClassName) {
case "org.apache.hudi.utilities.transform.SqlQueryBasedTransformer":
if (StringUtils.isNullOrEmpty(config.transformerSql)) {
- LOG.error("--transformer-sql is required when using
SqlQueryBasedTransformer");
+ log.error("--transformer-sql is required when using
SqlQueryBasedTransformer");
valid = false;
}
break;
case "org.apache.hudi.utilities.transform.SqlFileBasedTransformer":
if (StringUtils.isNullOrEmpty(config.transformerSqlFile)) {
- LOG.error("--transformer-sql-file is required when using
SqlFileBasedTransformer");
+ log.error("--transformer-sql-file is required when using
SqlFileBasedTransformer");
valid = false;
}
break;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieTTLJob.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieTTLJob.java
index 519ce482d6ec..3d6144976028 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieTTLJob.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieTTLJob.java
@@ -31,10 +31,9 @@ import org.apache.hudi.exception.HoodieException;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList;
@@ -43,9 +42,9 @@ import java.util.List;
/**
* Utility class to run TTL management.
*/
+@Slf4j
public class HoodieTTLJob {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieTTLJob.class);
private final Config cfg;
private final TypedProperties props;
private final JavaSparkContext jsc;
@@ -61,7 +60,7 @@ public class HoodieTTLJob {
this.jsc = jsc;
this.props = props;
this.metaClient = metaClient;
- LOG.info("Creating TTL job with configs : " + props.toString());
+ log.info("Creating TTL job with configs : {}", props.toString());
// Disable async cleaning, will trigger synchronous cleaning manually.
this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), false);
if (this.metaClient.getTableConfig().isMetadataTableAvailable()) {
@@ -129,7 +128,7 @@ public class HoodieTTLJob {
SparkAdapterSupport$.MODULE$.sparkAdapter().stopSparkContext(jssc,
exitCode);
}
- LOG.info("Hoodie TTL job ran successfully");
+ log.info("Hoodie TTL job ran successfully");
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java
index 524c4f549d2b..d1e25849c4b9 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java
@@ -41,12 +41,11 @@ import com.beust.jcommander.Parameter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.UniformReservoir;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@@ -97,10 +96,10 @@ import java.util.stream.Collectors;
* --base-path <base-path> \
* --num-days <number-of-days>
*/
+@Slf4j
public class TableSizeStats implements Serializable {
private static final long serialVersionUID = 1L;
- private static final Logger LOG =
LoggerFactory.getLogger(TableSizeStats.class);
// Date formatter for parsing partition dates (example: 2023/5/5/ or
2023-5-5).
private static final DateTimeFormatter DATE_FORMATTER =
@@ -138,6 +137,7 @@ public class TableSizeStats implements Serializable {
}
public static class Config implements Serializable {
+
@Parameter(names = {"--base-path", "-bp"}, description = "Base path for
the table", required = false)
public String basePath = null;
@@ -241,9 +241,9 @@ public class TableSizeStats implements Serializable {
TableSizeStats tableSizeStats = new TableSizeStats(jsc, cfg);
tableSizeStats.run();
} catch (TableNotFoundException e) {
- LOG.warn("The Hudi data table is not found: [{}].", cfg.basePath, e);
+ log.warn("The Hudi data table is not found: [{}].", cfg.basePath, e);
} catch (Throwable throwable) {
- LOG.error("Failed to get table size stats for {}", cfg, throwable);
+ log.error("Failed to get table size stats for {}", cfg, throwable);
} finally {
jsc.stop();
}
@@ -251,8 +251,8 @@ public class TableSizeStats implements Serializable {
public void run() {
try {
- LOG.info(cfg.toString());
- LOG.info(" ****** Fetching table size stats ******");
+ log.info(cfg.toString());
+ log.info(" ****** Fetching table size stats ******");
// Determine starting and ending date intervals for filtering data files.
LocalDate[] dateInterval = getUserSpecifiedDateInterval(cfg);
@@ -276,7 +276,7 @@ public class TableSizeStats implements Serializable {
private void logTableStats(String basePath, LocalDate[] dateInterval) throws
IOException {
- LOG.info("Processing table {}", basePath);
+ log.info("Processing table {}", basePath);
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
.enable(isMetadataEnabled(basePath, jsc))
.build();
@@ -351,7 +351,7 @@ public class TableSizeStats implements Serializable {
logStats("Table stats [path: " + basePath + "]", tableHistogram);
} else {
// Display only total talbe size
- LOG.info("Total size: {}",
getFileSizeUnit(Arrays.stream(tableHistogram.getSnapshot().getValues()).sum()));
+ log.info("Total size: {}",
getFileSizeUnit(Arrays.stream(tableHistogram.getSnapshot().getValues()).sum()));
}
}
@@ -378,7 +378,7 @@ public class TableSizeStats implements Serializable {
line = reader.readLine();
}
} catch (IOException ioe) {
- LOG.error("Error reading in properties from dfs from file." + propsPath);
+ log.error("Error reading in properties from dfs from file. {}",
propsPath);
throw new HoodieIOException("Cannot read properties from dfs from file "
+ propsPath, ioe);
}
return filePaths;
@@ -390,12 +390,12 @@ public class TableSizeStats implements Serializable {
if (cfg.endDate != null) {
try {
endDate = LocalDate.parse(cfg.endDate, DATE_FORMATTER);
- LOG.info("Setting ending date to {}. ", endDate);
+ log.info("Setting ending date to {}.", endDate);
} catch (DateTimeParseException dtpe) {
throw new HoodieException("Unable to parse --end-date. ", dtpe);
}
} else {
- LOG.info("End date is not specified: {}.", endDate);
+ log.info("End date is not specified: {}.", endDate);
}
// Set startDate to null by default.
@@ -404,14 +404,14 @@ public class TableSizeStats implements Serializable {
// Set startDate to cfg.startDate if specified. cfg.startDate takes
priority over cfg.numDays if both are specified.
if (cfg.startDate != null) {
startDate = LocalDate.parse(cfg.startDate, DATE_FORMATTER);
- LOG.info("Setting starting date to {}.", startDate);
+ log.info("Setting starting date to {}.", startDate);
} else {
if (cfg.numDays == 0) {
- LOG.info("Start date not specified: {}.", startDate);
+ log.info("Start date not specified: {}.", startDate);
} else if (cfg.numDays > 0) {
endDate = LocalDate.now();
startDate = endDate.minusDays(cfg.numDays);
- LOG.info("Setting starting date to {} ({} - {} days). ", startDate,
endDate, cfg.numDays);
+ log.info("Setting starting date to {} ({} - {} days). ", startDate,
endDate, cfg.numDays);
} else {
throw new HoodieException("--num-days must specify a positive value.");
}
@@ -422,7 +422,7 @@ public class TableSizeStats implements Serializable {
throw new HoodieException("Starting date must be before ending date.
Start Date: " + startDate + ", End Date: " + endDate);
}
- return startDate == null && endDate == null ? null : new
LocalDate[]{startDate, endDate};
+ return startDate == null && endDate == null ? null : new LocalDate[]
{startDate, endDate};
}
@Nullable
@@ -442,7 +442,7 @@ public class TableSizeStats implements Serializable {
try {
return LocalDate.parse(dateString, DATE_FORMATTER);
} catch (DateTimeParseException dtpe) {
- LOG.error("Partition name {} must conform to date format if
--start-date, --end-date, or --num-days are specified. ", partition, dtpe);
+ log.error("Partition name {} must conform to date format if
--start-date, --end-date, or --num-days are specified. ", partition, dtpe);
}
return partitionDate;
}
@@ -458,17 +458,17 @@ public class TableSizeStats implements Serializable {
}
private static void logStats(String header, Histogram histogram) {
- LOG.info(header);
+ log.info(header);
Snapshot snapshot = histogram.getSnapshot();
- LOG.info("Number of files: {}", snapshot.size());
- LOG.info("Total size: {}",
getFileSizeUnit(Arrays.stream(snapshot.getValues()).sum()));
- LOG.info("Minimum file size: {}", getFileSizeUnit(snapshot.getMin()));
- LOG.info("Maximum file size: {}", getFileSizeUnit(snapshot.getMax()));
- LOG.info("Average file size: {}", getFileSizeUnit(snapshot.getMean()));
- LOG.info("Median file size: {}", getFileSizeUnit(snapshot.getMedian()));
- LOG.info("P50 file size: {}", getFileSizeUnit(snapshot.getValue(0.5)));
- LOG.info("P90 file size: {}", getFileSizeUnit(snapshot.getValue(0.9)));
- LOG.info("P95 file size: {}", getFileSizeUnit(snapshot.getValue(0.95)));
- LOG.info("P99 file size: {}", getFileSizeUnit(snapshot.getValue(0.99)));
+ log.info("Number of files: {}", snapshot.size());
+ log.info("Total size: {}",
getFileSizeUnit(Arrays.stream(snapshot.getValues()).sum()));
+ log.info("Minimum file size: {}", getFileSizeUnit(snapshot.getMin()));
+ log.info("Maximum file size: {}", getFileSizeUnit(snapshot.getMax()));
+ log.info("Average file size: {}", getFileSizeUnit(snapshot.getMean()));
+ log.info("Median file size: {}", getFileSizeUnit(snapshot.getMedian()));
+ log.info("P50 file size: {}", getFileSizeUnit(snapshot.getValue(0.5)));
+ log.info("P90 file size: {}", getFileSizeUnit(snapshot.getValue(0.9)));
+ log.info("P95 file size: {}", getFileSizeUnit(snapshot.getValue(0.95)));
+ log.info("P99 file size: {}", getFileSizeUnit(snapshot.getValue(0.99)));
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 9d04641f71b4..22bf3f4b4dcb 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -76,6 +76,7 @@ import org.apache.hudi.utilities.transform.ChainedTransformer;
import org.apache.hudi.utilities.transform.ErrorTableAwareChainedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -93,8 +94,6 @@ import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.jdbc.JdbcDialects;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.LongAccumulator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
@@ -126,6 +125,7 @@ import static
org.apache.hudi.utilities.config.HoodieStreamerConfig.SCHEMA_MAKE_
/**
* Bunch of helper methods.
*/
+@Slf4j
public class UtilHelpers {
public static final String EXECUTE = "execute";
@@ -133,8 +133,6 @@ public class UtilHelpers {
public static final String SCHEDULE_AND_EXECUTE = "scheduleandexecute";
public static final String PURGE_PENDING_INSTANT = "purge_pending_instant";
- private static final Logger LOG = LoggerFactory.getLogger(UtilHelpers.class);
-
public static HoodieRecordMerger createRecordMerger(Properties props) {
return HoodieRecordUtils.createRecordMerger(null, EngineType.SPARK,
StringUtils.split(ConfigUtils.getStringWithAltKeys(props,
HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES, null), ","),
@@ -142,7 +140,7 @@ public class UtilHelpers {
}
public static Source createSource(String sourceClass, TypedProperties cfg,
JavaSparkContext jssc,
- SparkSession sparkSession,
HoodieIngestionMetrics metrics, StreamContext streamContext) throws IOException
{
+ SparkSession sparkSession, HoodieIngestionMetrics metrics, StreamContext
streamContext) throws IOException {
// All possible constructors.
Class<?>[] constructorArgsStreamContextMetrics = new Class<?>[]
{TypedProperties.class, JavaSparkContext.class, SparkSession.class,
HoodieIngestionMetrics.class, StreamContext.class};
Class<?>[] constructorArgsStreamContext = new Class<?>[]
{TypedProperties.class, JavaSparkContext.class, SparkSession.class,
StreamContext.class};
@@ -168,7 +166,7 @@ public class UtilHelpers {
String constructorSignature = Arrays.stream(constructor.getLeft())
.map(Class::getSimpleName)
.collect(Collectors.joining(", ", "[", "]"));
- LOG.error("Unexpected error while loading source class {} with
constructor signature {}", sourceClass, constructorSignature, e);
+ log.error("Unexpected error while loading source class {} with
constructor signature {}", sourceClass, constructorSignature, e);
} catch (Throwable t) {
throw new IOException("Could not load source class due to unexpected
error " + sourceClass, t);
}
@@ -197,7 +195,7 @@ public class UtilHelpers {
}
public static SchemaProvider createSchemaProvider(String
schemaProviderClass, TypedProperties cfg,
- JavaSparkContext jssc)
throws IOException {
+ JavaSparkContext jssc) throws IOException {
try {
return StringUtils.isNullOrEmpty(schemaProviderClass) ? null
: (SchemaProvider) ReflectionUtils.loadClass(schemaProviderClass,
cfg, jssc);
@@ -233,7 +231,7 @@ public class UtilHelpers {
}
public static Option<Transformer> createTransformer(Option<List<String>>
classNamesOpt, Supplier<Option<HoodieSchema>> sourceSchemaSupplier,
- boolean
isErrorTableWriterEnabled) throws IOException {
+ boolean isErrorTableWriterEnabled) throws IOException {
try {
Function<List<String>, Transformer> chainedTransformerFunction =
classNames ->
@@ -255,13 +253,13 @@ public class UtilHelpers {
}
public static DFSPropertiesConfiguration readConfig(Configuration
hadoopConfig,
- Path cfgPath,
- List<String>
overriddenProps) {
+ Path cfgPath,
+ List<String> overriddenProps) {
StoragePath storagePath = convertToStoragePath(cfgPath);
DFSPropertiesConfiguration conf = new
DFSPropertiesConfiguration(hadoopConfig, storagePath);
try {
if (!overriddenProps.isEmpty()) {
- LOG.info("Adding overridden properties to file properties.");
+ log.info("Adding overridden properties to file properties.");
conf.addPropsFromStream(new BufferedReader(new
StringReader(String.join("\n", overriddenProps))), storagePath);
}
} catch (IOException ioe) {
@@ -275,7 +273,7 @@ public class UtilHelpers {
DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration();
try {
if (!overriddenProps.isEmpty()) {
- LOG.info("Adding overridden properties to file properties.");
+ log.info("Adding overridden properties to file properties.");
conf.addPropsFromStream(new BufferedReader(new
StringReader(String.join("\n", overriddenProps))), null);
}
} catch (IOException ioe) {
@@ -289,7 +287,7 @@ public class UtilHelpers {
return StringUtils.isNullOrEmpty(propsFilePath)
? UtilHelpers.buildProperties(props)
: UtilHelpers.readConfig(hadoopConf, new Path(propsFilePath), props)
- .getProps(true);
+ .getProps(true);
}
public static TypedProperties buildProperties(List<String> props) {
@@ -310,7 +308,7 @@ public class UtilHelpers {
/**
* Parse Schema from file.
*
- * @param fs File System
+ * @param fs File System
* @param schemaFile Schema File
*/
public static String parseSchema(FileSystem fs, String schemaFile) throws
Exception {
@@ -412,9 +410,9 @@ public class UtilHelpers {
/**
* Build Hoodie write client.
*
- * @param jsc Java Spark Context
- * @param basePath Base Path
- * @param schemaStr Schema
+ * @param jsc Java Spark Context
+ * @param basePath Base Path
+ * @param schemaStr Schema
* @param parallelism Parallelism
*/
public static SparkRDDWriteClient<HoodieRecordPayload>
createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr,
@@ -440,14 +438,14 @@ public class UtilHelpers {
writeResponse.foreach(writeStatus -> {
if (writeStatus.hasErrors()) {
errors.add(1);
- LOG.error("Error processing records :writeStatus:{}",
writeStatus.getStat().toString());
+ log.error("Error processing records :writeStatus:{}",
writeStatus.getStat().toString());
}
});
if (errors.value() == 0) {
- LOG.info("Table imported into hoodie with {} instant time.",
instantTime);
+ log.info("Table imported into hoodie with {} instant time.",
instantTime);
return 0;
}
- LOG.error("Import failed with {} errors.", errors.value());
+ log.error("Import failed with {} errors.", errors.value());
return -1;
}
@@ -455,11 +453,11 @@ public class UtilHelpers {
List<HoodieWriteStat> writeStats = metadata.getWriteStats();
long errorsCount =
writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum();
if (errorsCount == 0) {
- LOG.info("Finish job with {} instant time.", instantTime);
+ log.info("Finish job with {} instant time.", instantTime);
return 0;
}
- LOG.error("Job failed with {} errors.", errorsCount);
+ log.error("Job failed with {} errors.", errorsCount);
return -1;
}
@@ -571,7 +569,7 @@ public class UtilHelpers {
}
public static SchemaProvider
wrapSchemaProviderWithPostProcessor(SchemaProvider provider,
-
TypedProperties cfg, JavaSparkContext jssc, List<String>
transformerClassNames) {
+ TypedProperties cfg, JavaSparkContext jssc, List<String>
transformerClassNames) {
if (provider == null) {
return null;
@@ -601,16 +599,16 @@ public class UtilHelpers {
}
public static SchemaProvider createRowBasedSchemaProvider(StructType
structType,
- TypedProperties
cfg,
- JavaSparkContext
jssc) {
+ TypedProperties cfg,
+ JavaSparkContext jssc) {
SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType);
return wrapSchemaProviderWithPostProcessor(rowSchemaProvider, cfg, jssc,
null);
}
public static Option<HoodieSchema> getLatestTableSchema(JavaSparkContext
jssc,
- HoodieStorage
storage,
- String basePath,
-
HoodieTableMetaClient tableMetaClient) {
+ HoodieStorage storage,
+ String basePath,
+ HoodieTableMetaClient tableMetaClient) {
try {
if (FSUtils.isTableExists(basePath, storage)) {
TableSchemaResolver tableSchemaResolver = new
TableSchemaResolver(tableMetaClient);
@@ -618,7 +616,7 @@ public class UtilHelpers {
return tableSchemaResolver.getTableSchemaFromLatestCommit(false);
}
} catch (Exception e) {
- LOG.warn("Failed to fetch latest table's schema", e);
+ log.warn("Failed to fetch latest table's schema", e);
}
return Option.empty();
@@ -655,6 +653,7 @@ public class UtilHelpers {
@FunctionalInterface
public interface CheckedSupplier<T> {
+
T get() throws Throwable;
}
@@ -665,7 +664,7 @@ public class UtilHelpers {
ret = supplier.get();
} while (ret != 0 && maxRetryCount-- > 0);
} catch (Throwable t) {
- LOG.error(errorMessage, t);
+ log.error(errorMessage, t);
throw new RuntimeException("Failed in retry", t);
}
return ret;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/kafka/HoodieWriteCommitKafkaCallback.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/kafka/HoodieWriteCommitKafkaCallback.java
index df466a0a7dd6..382fe27b5d1d 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/kafka/HoodieWriteCommitKafkaCallback.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/kafka/HoodieWriteCommitKafkaCallback.java
@@ -25,13 +25,12 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Properties;
@@ -45,10 +44,9 @@ import static
org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCal
/**
* Kafka implementation of {@link HoodieWriteCommitCallback}.
*/
+@Slf4j
public class HoodieWriteCommitKafkaCallback implements
HoodieWriteCommitCallback {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieWriteCommitKafkaCallback.class);
-
private final HoodieConfig hoodieConfig;
private final String bootstrapServers;
private final String topic;
@@ -66,9 +64,9 @@ public class HoodieWriteCommitKafkaCallback implements
HoodieWriteCommitCallback
try (KafkaProducer<String, String> producer =
createProducer(hoodieConfig)) {
ProducerRecord<String, String> record =
buildProducerRecord(hoodieConfig, callbackMsg);
producer.send(record).get();
- LOG.info("Send callback message succeed");
+ log.info("Send callback message succeed");
} catch (Exception e) {
- LOG.error("Send kafka callback msg failed : ", e);
+ log.error("Send kafka callback msg failed : ", e);
}
}
@@ -94,8 +92,8 @@ public class HoodieWriteCommitKafkaCallback implements
HoodieWriteCommitCallback
kafkaProducerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
- LOG.debug("Callback kafka producer init with configs: "
- +
HoodieWriteCommitCallbackUtil.convertToJsonString(kafkaProducerProps));
+ log.debug("Callback kafka producer init with configs: {}",
+ HoodieWriteCommitCallbackUtil.convertToJsonString(kafkaProducerProps));
return new KafkaProducer<String, String>(kafkaProducerProps);
}
@@ -138,11 +136,11 @@ public class HoodieWriteCommitKafkaCallback implements
HoodieWriteCommitCallback
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (null != metadata) {
- LOG.info("message offset={} partition={} timestamp={} topic={}",
+ log.info("message offset={} partition={} timestamp={} topic={}",
metadata.offset(), metadata.partition(), metadata.timestamp(),
metadata.topic());
}
if (null != exception) {
- LOG.error("Send kafka callback msg failed : ", exception);
+ log.error("Send kafka callback msg failed: ", exception);
}
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/pulsar/HoodieWriteCommitPulsarCallback.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/pulsar/HoodieWriteCommitPulsarCallback.java
index af4bbbf49e46..379aba2d19ec 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/pulsar/HoodieWriteCommitPulsarCallback.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/pulsar/HoodieWriteCommitPulsarCallback.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
@@ -32,8 +33,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
@@ -56,10 +55,9 @@ import static
org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarC
/**
* Pulsar implementation of {@link HoodieWriteCommitCallback}.
*/
+@Slf4j
public class HoodieWriteCommitPulsarCallback implements
HoodieWriteCommitCallback, Closeable {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieWriteCommitPulsarCallback.class);
-
private final String serviceUrl;
private final String topic;
@@ -85,9 +83,9 @@ public class HoodieWriteCommitPulsarCallback implements
HoodieWriteCommitCallbac
String callbackMsg =
HoodieWriteCommitCallbackUtil.convertToJsonString(callbackMessage);
try {
producer.newMessage().key(callbackMessage.getTableName()).value(callbackMsg).send();
- LOG.info("Send callback message succeed");
+ log.info("Send callback message succeed");
} catch (Exception e) {
- LOG.error("Send pulsar callback msg failed : ", e);
+ log.error("Send pulsar callback msg failed: ", e);
}
}
@@ -165,7 +163,7 @@ public class HoodieWriteCommitPulsarCallback implements
HoodieWriteCommitCallbac
try {
producer.close();
} catch (Throwable t) {
- LOG.warn("Could not properly close the producer.", t);
+ log.warn("Could not properly close the producer.", t);
}
}
@@ -173,7 +171,7 @@ public class HoodieWriteCommitPulsarCallback implements
HoodieWriteCommitCallbac
try {
client.close();
} catch (Throwable t) {
- LOG.warn("Could not properly close the client.", t);
+ log.warn("Could not properly close the client.", t);
}
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java
index 365a01b60409..a2ffb4878454 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java
@@ -23,6 +23,7 @@ import org.apache.hudi.exception.HoodieException;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import lombok.NoArgsConstructor;
import org.apache.avro.Schema;
import org.apache.kafka.common.errors.SerializationException;
@@ -35,13 +36,11 @@ import static
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_VALUE_DES
/**
* Extending {@link KafkaAvroSchemaDeserializer} as we need to be able to
inject reader schema during deserialization.
*/
+@NoArgsConstructor
public class KafkaAvroSchemaDeserializer extends KafkaAvroDeserializer {
private Schema sourceSchema;
- public KafkaAvroSchemaDeserializer() {
- }
-
public KafkaAvroSchemaDeserializer(SchemaRegistryClient client, Map<String,
?> props) {
super(client, props);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionException.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionException.java
index 04174f740767..57f3e7910c8e 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionException.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionException.java
@@ -20,14 +20,14 @@ package org.apache.hudi.utilities.ingestion;
import org.apache.hudi.exception.HoodieException;
+import lombok.NoArgsConstructor;
+
/**
* The root exception class for any failure with {@link
HoodieIngestionService}.
*/
+@NoArgsConstructor
public class HoodieIngestionException extends HoodieException {
- public HoodieIngestionException() {
- }
-
public HoodieIngestionException(String message) {
super(message);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionService.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionService.java
index 5f2c15f090c4..2769d851b107 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionService.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionService.java
@@ -27,8 +27,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.streamer.PostWriteTerminationStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -40,10 +39,9 @@ import static
org.apache.hudi.utilities.ingestion.HoodieIngestionService.HoodieI
/**
* A generic service to facilitate running data ingestion.
*/
+@Slf4j
public abstract class HoodieIngestionService extends HoodieAsyncService {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieIngestionService.class);
-
protected HoodieIngestionConfig ingestionConfig;
public HoodieIngestionService(HoodieIngestionConfig ingestionConfig) {
@@ -59,18 +57,18 @@ public abstract class HoodieIngestionService extends
HoodieAsyncService {
*/
public void startIngestion() {
if (ingestionConfig.getBoolean(INGESTION_IS_CONTINUOUS)) {
- LOG.info("Ingestion service starts running in continuous mode");
+ log.info("Ingestion service starts running in continuous mode");
start(this::onIngestionCompletes);
try {
waitForShutdown();
} catch (Exception e) {
throw new HoodieIngestionException("Ingestion service was shut down
with exception.", e);
}
- LOG.info("Ingestion service (continuous mode) has been shut down.");
+ log.info("Ingestion service (continuous mode) has been shut down.");
} else {
- LOG.info("Ingestion service starts running in run-once mode");
+ log.info("Ingestion service starts running in run-once mode");
ingestOnce();
- LOG.info("Ingestion service (run-once mode) has been shut down.");
+ log.info("Ingestion service (run-once mode) has been shut down.");
}
}
@@ -121,8 +119,8 @@ public abstract class HoodieIngestionService extends
HoodieAsyncService {
long minSyncInternalSeconds =
ingestionConfig.getLongOrDefault(INGESTION_MIN_SYNC_INTERNAL_SECONDS);
long sleepMs = minSyncInternalSeconds * 1000 -
(System.currentTimeMillis() - ingestionStartEpochMillis);
if (sleepMs > 0) {
- LOG.info(String.format("Last ingestion took less than min sync
interval: %d s; sleep for %.2f s",
- minSyncInternalSeconds, sleepMs / 1000.0));
+ log.info("Last ingestion took less than min sync interval: {} s; sleep
for {} s",
+ minSyncInternalSeconds, sleepMs / 1000.0);
Thread.sleep(sleepMs);
}
} catch (InterruptedException e) {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ArchiveTask.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ArchiveTask.java
index fe5decd7005c..03d67dd2d969 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ArchiveTask.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ArchiveTask.java
@@ -25,21 +25,20 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.utilities.UtilHelpers;
+import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Archive task to run in TableServicePipeline.
*
* @see HoodieMultiTableServicesMain
*/
+@Slf4j
class ArchiveTask extends TableServiceTask {
- private static final Logger LOG = LoggerFactory.getLogger(ArchiveTask.class);
@Override
void run() {
- LOG.info("Run Archive with props: " + props);
+ log.info("Run Archive with props: {}", props);
HoodieWriteConfig hoodieCfg =
HoodieWriteConfig.newBuilder().withPath(basePath).withProps(props).build();
try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(new
HoodieSparkEngineContext(jsc), hoodieCfg)) {
UtilHelpers.retry(retry, () -> {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ClusteringTask.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ClusteringTask.java
index 66efbd475dc4..34d585b903d6 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ClusteringTask.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ClusteringTask.java
@@ -23,6 +23,8 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.utilities.HoodieClusteringJob;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
import org.apache.spark.api.java.JavaSparkContext;
/**
@@ -71,6 +73,7 @@ class ClusteringTask extends TableServiceTask {
/**
* Builder class for {@link ClusteringTask}.
*/
+ @NoArgsConstructor(access = AccessLevel.PRIVATE)
public static final class Builder {
/**
@@ -110,9 +113,6 @@ class ClusteringTask extends TableServiceTask {
*/
private HoodieTableMetaClient metaClient;
- private Builder() {
- }
-
public Builder withProps(TypedProperties props) {
this.props = props;
return this;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/HoodieMultiTableServicesMain.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/HoodieMultiTableServicesMain.java
index aade3d0a4854..9d4af77d3f42 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/HoodieMultiTableServicesMain.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/HoodieMultiTableServicesMain.java
@@ -28,11 +28,10 @@ import org.apache.hudi.utilities.UtilHelpers;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
@@ -51,8 +50,9 @@ import java.util.stream.Collectors;
/**
* Main function for executing multi-table services.
*/
+@Slf4j
public class HoodieMultiTableServicesMain {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieMultiTableServicesMain.class);
+
final Config cfg;
final TypedProperties props;
@@ -107,7 +107,7 @@ public class HoodieMultiTableServicesMain {
}
public void startServices() throws ExecutionException, InterruptedException {
- LOG.info("StartServices Config: " + cfg);
+ log.info("StartServices Config: {}", cfg);
List<String> tablePaths;
if (cfg.autoDiscovery) {
// We support defining multi base paths
@@ -118,7 +118,7 @@ public class HoodieMultiTableServicesMain {
} else {
tablePaths = MultiTableServiceUtils.getTablesToBeServedFromProps(jsc,
props);
}
- LOG.info("All table paths: " + String.join(",", tablePaths));
+ log.info("All table paths: {}", String.join(",", tablePaths));
if (cfg.batch) {
batchRunTableServices(tablePaths);
} else {
@@ -253,7 +253,7 @@ public class HoodieMultiTableServicesMain {
try {
new HoodieMultiTableServicesMain(jsc, cfg).startServices();
} catch (Throwable throwable) {
- LOG.error("Fail to run table services, ", throwable);
+ log.error("Fail to run table services, ", throwable);
} finally {
jsc.stop();
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java
index 41bd7248f0f3..3c1f1453f54e 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java
@@ -29,13 +29,12 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.utilities.UtilHelpers;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
@@ -49,8 +48,8 @@ import static
org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME
/**
* Utils for executing multi-table services.
*/
+@Slf4j
public class MultiTableServiceUtils {
- private static final Logger LOG =
LoggerFactory.getLogger(MultiTableServiceUtils.class);
public static class Constants {
public static final String TABLES_TO_BE_SERVED_PROP =
"hoodie.tableservice.tablesToServe";
@@ -79,7 +78,7 @@ public class MultiTableServiceUtils {
return true;
} else {
// Log the wrong path in console.
- LOG.info("Hoodie table not found in path {}, skip", tablePath);
+ log.info("Hoodie table not found in path {}, skip", tablePath);
return false;
}
}).collect(Collectors.toList());
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
index 330eece72e9a..39efad86c3d3 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
@@ -40,10 +40,9 @@ import com.beust.jcommander.Parameter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.UniformReservoir;
+import lombok.extern.slf4j.Slf4j;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
@@ -62,10 +61,10 @@ import java.util.stream.IntStream;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+@Slf4j
public class TimelineServerPerf implements Serializable {
private static final long serialVersionUID = 1L;
- private static final Logger LOG =
LoggerFactory.getLogger(TimelineServerPerf.class);
private final Config cfg;
private transient TimelineService timelineServer;
private final boolean useExternalTimelineServer;
@@ -84,10 +83,10 @@ public class TimelineServerPerf implements Serializable {
private void setHostAddrFromSparkConf(SparkConf sparkConf) {
String hostAddr = sparkConf.get("spark.driver.host", null);
if (hostAddr != null) {
- LOG.info("Overriding hostIp to ({}) found in spark-conf. It was {}",
hostAddr, this.hostAddr);
+ log.info("Overriding hostIp to ({}) found in spark-conf. It was {}",
hostAddr, this.hostAddr);
this.hostAddr = hostAddr;
} else {
- LOG.warn("Unable to find driver bind address from spark config");
+ log.warn("Unable to find driver bind address from spark config");
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DelegatingSchemaProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DelegatingSchemaProvider.java
index 5662c46169ce..dd7924a62a56 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DelegatingSchemaProvider.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DelegatingSchemaProvider.java
@@ -21,11 +21,13 @@ package org.apache.hudi.utilities.schema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.schema.HoodieSchema;
+import lombok.Getter;
import org.apache.spark.api.java.JavaSparkContext;
/**
* SchemaProvider which uses separate Schema Providers for source and target.
*/
+@Getter
public final class DelegatingSchemaProvider extends SchemaProvider {
private final SchemaProvider sourceSchemaProvider;
@@ -48,12 +50,4 @@ public final class DelegatingSchemaProvider extends
SchemaProvider {
public HoodieSchema getTargetHoodieSchema() {
return targetSchemaProvider.getTargetHoodieSchema();
}
-
- public SchemaProvider getSourceSchemaProvider() {
- return sourceSchemaProvider;
- }
-
- public SchemaProvider getTargetSchemaProvider() {
- return targetSchemaProvider;
- }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
index 97447580ced2..ff843f1ab537 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
@@ -31,6 +31,7 @@ import
org.apache.hudi.utilities.sources.helpers.SanitizationUtils;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
+import lombok.Getter;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -54,6 +55,7 @@ public class FilebasedSchemaProvider extends SchemaProvider {
private final String sourceFile;
private final String targetFile;
+ @Getter
protected Schema sourceSchema;
protected Schema targetSchema;
@@ -74,11 +76,6 @@ public class FilebasedSchemaProvider extends SchemaProvider {
return readSchemaFromFile(schemaFile, this.fs, config);
}
- @Override
- public Schema getSourceSchema() {
- return sourceSchema;
- }
-
@Override
public Schema getTargetSchema() {
if (targetSchema != null) {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
index 6c58fb2739dc..7ec566b33db3 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.utilities.config.HiveSchemaProviderConfig;
import org.apache.hudi.utilities.exception.HoodieSchemaFetchException;
+import lombok.Getter;
import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
@@ -42,10 +43,11 @@ import static
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
/**
* A schema provider to get data schema through user specified hive table.
*/
+@Getter
public class HiveSchemaProvider extends SchemaProvider {
- private final HoodieSchema sourceSchema;
- private HoodieSchema targetSchema;
+ private final HoodieSchema sourceHoodieSchema;
+ private HoodieSchema targetHoodieSchema;
public HiveSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
@@ -58,7 +60,7 @@ public class HiveSchemaProvider extends SchemaProvider {
try {
TableIdentifier sourceSchemaTable = new
TableIdentifier(sourceSchemaTableName,
scala.Option.apply(sourceSchemaDatabaseName));
StructType sourceSchema =
spark.sessionState().catalog().getTableMetadata(sourceSchemaTable).schema();
- this.sourceSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ this.sourceHoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
sourceSchema,
sourceSchemaTableName,
"hoodie." + sourceSchemaDatabaseName);
@@ -73,7 +75,7 @@ public class HiveSchemaProvider extends SchemaProvider {
try {
TableIdentifier targetSchemaTable = new
TableIdentifier(targetSchemaTableName,
scala.Option.apply(targetSchemaDatabaseName));
StructType targetSchema =
spark.sessionState().catalog().getTableMetadata(targetSchemaTable).schema();
- this.targetSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ this.targetHoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
targetSchema,
targetSchemaTableName,
"hoodie." + targetSchemaDatabaseName);
@@ -84,14 +86,16 @@ public class HiveSchemaProvider extends SchemaProvider {
}
@Override
+ @Deprecated
public Schema getSourceSchema() {
- return sourceSchema.toAvroSchema();
+ return getSourceHoodieSchema().toAvroSchema();
}
@Override
+ @Deprecated
public Schema getTargetSchema() {
- if (targetSchema != null) {
- return targetSchema.toAvroSchema();
+ if (getTargetHoodieSchema() != null) {
+ return getTargetHoodieSchema().toAvroSchema();
} else {
return super.getTargetSchema();
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
index b23f7d24fbc1..159634c8818d 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
@@ -40,13 +40,12 @@ import
io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;
@@ -79,8 +78,9 @@ import static
org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
* <p>
* https://github.com/confluentinc/schema-registry
*/
+@Slf4j
public class SchemaRegistryProvider extends SchemaProvider {
- private static final Logger LOG =
LoggerFactory.getLogger(SchemaRegistryProvider.class);
+
private static final Pattern URL_PATTERN =
Pattern.compile("(.*/)subjects/(.*)/versions/(.*)");
private static final String LATEST = "latest";
@@ -195,7 +195,7 @@ public class SchemaRegistryProvider extends SchemaProvider {
} catch (IllegalAccessError error) {
// If we're not processing Protobuf schema, fall back to the legacy
method
if (!ProtobufSchema.TYPE.equalsIgnoreCase(schemaType)) {
- LOG.warn("Falling back to legacy schema retrieval due to
IllegalAccessError", error);
+ log.warn("Falling back to legacy schema retrieval due to
IllegalAccessError", error);
return fetchSchemaUsingLegacyMethod(registryUrl);
}
// Otherwise, rethrow the error
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SimpleSchemaProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SimpleSchemaProvider.java
index 854486e0343b..c9698feb9ad8 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SimpleSchemaProvider.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SimpleSchemaProvider.java
@@ -21,9 +21,11 @@ package org.apache.hudi.utilities.schema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.schema.HoodieSchema;
+import lombok.Getter;
import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaSparkContext;
+@Getter
public class SimpleSchemaProvider extends SchemaProvider {
private final Schema sourceSchema;
@@ -32,9 +34,4 @@ public class SimpleSchemaProvider extends SchemaProvider {
super(props, jssc);
this.sourceSchema = sourceSchema.toAvroSchema();
}
-
- @Override
- public Schema getSourceSchema() {
- return sourceSchema;
- }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DeleteSupportSchemaPostProcessor.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DeleteSupportSchemaPostProcessor.java
index acc9ad78cb2d..cb2079a9d25c 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DeleteSupportSchemaPostProcessor.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DeleteSupportSchemaPostProcessor.java
@@ -26,10 +26,9 @@ import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
@@ -38,10 +37,9 @@ import java.util.List;
* An implementation of {@link SchemaPostProcessor} which will add a column
named "_hoodie_is_deleted" to the end of
* a given schema.
*/
+@Slf4j
public class DeleteSupportSchemaPostProcessor extends SchemaPostProcessor {
- private static final Logger LOG =
LoggerFactory.getLogger(DeleteSupportSchemaPostProcessor.class);
-
public DeleteSupportSchemaPostProcessor(TypedProperties props,
JavaSparkContext jssc) {
super(props, jssc);
}
@@ -55,7 +53,7 @@ public class DeleteSupportSchemaPostProcessor extends
SchemaPostProcessor {
@Override
public HoodieSchema processSchema(HoodieSchema schema) {
if (schema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD).isPresent()) {
- LOG.warn("column {} already exists!",
HoodieRecord.HOODIE_IS_DELETED_FIELD);
+ log.warn("column {} already exists!",
HoodieRecord.HOODIE_IS_DELETED_FIELD);
return schema;
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java
index 703dea14544f..bb3de9344ea7 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java
@@ -27,10 +27,9 @@ import
org.apache.hudi.utilities.config.SchemaProviderPostProcessorConfig;
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.LinkedList;
@@ -49,10 +48,9 @@ import static
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
* <p>
*
properties.put("hoodie.streamer.schemaprovider.schema_post_processor.delete.columns",
"column1,column2").
*/
+@Slf4j
public class DropColumnSchemaPostProcessor extends SchemaPostProcessor {
- private static final Logger LOG =
LoggerFactory.getLogger(DropColumnSchemaPostProcessor.class);
-
public DropColumnSchemaPostProcessor(TypedProperties props, JavaSparkContext
jssc) {
super(props, jssc);
}
@@ -77,7 +75,7 @@ public class DropColumnSchemaPostProcessor extends
SchemaPostProcessor {
this.config,
SchemaProviderPostProcessorConfig.DELETE_COLUMN_POST_PROCESSOR_COLUMN);
if (StringUtils.isNullOrEmpty(columnToDeleteStr)) {
- LOG.warn("Param {} is null or empty, return original schema",
+ log.warn("Param {} is null or empty, return original schema",
SchemaProviderPostProcessorConfig.DELETE_COLUMN_POST_PROCESSOR_COLUMN.key());
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
index 726770e5ad4a..79ad5ec3dcf1 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
@@ -27,6 +27,8 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.utilities.exception.HoodieTransformPlanException;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
+import lombok.AccessLevel;
+import lombok.Getter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -141,6 +143,8 @@ public class ChainedTransformer implements Transformer {
}
protected static class TransformerInfo {
+
+ @Getter(AccessLevel.PROTECTED)
private final Transformer transformer;
private final Option<String> idOpt;
@@ -154,10 +158,6 @@ public class ChainedTransformer implements Transformer {
this.idOpt = Option.empty();
}
- protected Transformer getTransformer() {
- return transformer;
- }
-
private boolean hasIdentifier() {
return idOpt.isPresent();
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
index 1256491528d1..76f21a308e39 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
@@ -21,24 +21,23 @@ package org.apache.hudi.utilities.transform;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.exception.HoodieTransformExecutionException;
+import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.UUID;
/**
* Transformer that can flatten nested objects. It currently doesn't unnest
arrays.
*/
+@Slf4j
public class FlatteningTransformer implements Transformer {
private static final String TMP_TABLE = "HUDI_SRC_TMP_TABLE_";
- private static final Logger LOG =
LoggerFactory.getLogger(FlatteningTransformer.class);
/**
* Configs supported.
@@ -49,7 +48,7 @@ public class FlatteningTransformer implements Transformer {
try {
// tmp table name doesn't like dashes
String tmpTable =
TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
- LOG.info("Registering tmp table : " + tmpTable);
+ log.info("Registering tmp table: {}", tmpTable);
rowDataset.createOrReplaceTempView(tmpTable);
Dataset<Row> transformed = sparkSession.sql("select " +
flattenSchema(rowDataset.schema(), null) + " from " + tmpTable);
sparkSession.catalog().dropTempView(tmpTable);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java
index cdef1677e587..1e8af287f809 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java
@@ -23,14 +23,13 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.utilities.config.SqlTransformerConfig;
import org.apache.hudi.utilities.exception.HoodieTransformExecutionException;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Scanner;
@@ -56,10 +55,9 @@ import static
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
* <p>
* SELECT * FROM tmp_personal_trips;
*/
+@Slf4j
public class SqlFileBasedTransformer implements Transformer {
- private static final Logger LOG =
LoggerFactory.getLogger(SqlFileBasedTransformer.class);
-
private static final String SRC_PATTERN = "<SRC>";
private static final String TMP_TABLE = "HOODIE_SRC_TMP_TABLE_";
@@ -75,19 +73,19 @@ public class SqlFileBasedTransformer implements Transformer
{
final FileSystem fs = HadoopFSUtils.getFs(sqlFile,
jsc.hadoopConfiguration(), true);
// tmp table name doesn't like dashes
final String tmpTable =
TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
- LOG.info("Registering tmp table: {}", tmpTable);
+ log.info("Registering tmp table: {}", tmpTable);
rowDataset.createOrReplaceTempView(tmpTable);
try (final Scanner scanner = new Scanner(fs.open(new Path(sqlFile)),
"UTF-8")) {
Dataset<Row> rows = null;
// each sql statement is separated with semicolon hence set that as
delimiter.
scanner.useDelimiter(";");
- LOG.info("SQL Query for transformation:");
+ log.info("SQL Query for transformation:");
while (scanner.hasNext()) {
String sqlStr = scanner.next();
sqlStr = sqlStr.replaceAll(SRC_PATTERN, tmpTable).trim();
if (!sqlStr.isEmpty()) {
- LOG.info(sqlStr);
+ log.info(sqlStr);
// overwrite the same dataset object until the last statement then
return.
rows = sparkSession.sql(sqlStr);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java
index 290e3a69432a..9013ab04b9b3 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java
@@ -22,12 +22,11 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.config.SqlTransformerConfig;
import org.apache.hudi.utilities.exception.HoodieTransformExecutionException;
+import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.UUID;
@@ -38,10 +37,9 @@ import static
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
*
* The query should reference the source as a table named "\<SRC\>"
*/
+@Slf4j
public class SqlQueryBasedTransformer implements Transformer {
- private static final Logger LOG =
LoggerFactory.getLogger(SqlQueryBasedTransformer.class);
-
private static final String SRC_PATTERN = "<SRC>";
private static final String TMP_TABLE = "HOODIE_SRC_TMP_TABLE_";
@@ -53,10 +51,10 @@ public class SqlQueryBasedTransformer implements
Transformer {
try {
// tmp table name doesn't like dashes
String tmpTable =
TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
- LOG.info("Registering tmp table: {}", tmpTable);
+ log.info("Registering tmp table: {}", tmpTable);
rowDataset.createOrReplaceTempView(tmpTable);
String sqlStr = transformerSQL.replaceAll(SRC_PATTERN, tmpTable);
- LOG.debug("SQL Query for transformation: {}", sqlStr);
+ log.debug("SQL Query for transformation: {}", sqlStr);
Dataset<Row> transformed = sparkSession.sql(sqlStr);
sparkSession.catalog().dropTempView(tmpTable);
return transformed;