This is an automated email from the ASF dual-hosted git repository.

voonhous pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 095e80a9c256 refactor: Add Lombok annotations to hudi-utilities (Part 
1) (#17823)
095e80a9c256 is described below

commit 095e80a9c256959d3802b0a138e48e72747940a9
Author: voonhous <[email protected]>
AuthorDate: Tue Jun 2 15:46:24 2026 +0800

    refactor: Add Lombok annotations to hudi-utilities (Part 1) (#17823)
    
    * refactor: Add Lombok annotations to hudi-utilities (Part 1)
---
 hudi-utilities/pom.xml                             |   1 +
 .../hudi/utilities/HiveIncrementalPuller.java      |  53 ++++-----
 .../org/apache/hudi/utilities/HoodieCleaner.java   |  10 +-
 .../apache/hudi/utilities/HoodieClusteringJob.java |  36 +++---
 .../org/apache/hudi/utilities/HoodieCompactor.java |  43 ++++---
 .../hudi/utilities/HoodieDataTableValidator.java   |  36 +++---
 .../hudi/utilities/HoodieDropPartitionsTool.java   |  31 +++--
 .../org/apache/hudi/utilities/HoodieIndexer.java   |  36 +++---
 .../utilities/HoodieMetadataTableValidator.java    | 132 +++++++++------------
 .../apache/hudi/utilities/HoodieRepairTool.java    |  57 ++++-----
 .../hudi/utilities/HoodieSnapshotExporter.java     |  21 ++--
 .../org/apache/hudi/utilities/HoodieTTLJob.java    |   9 +-
 .../org/apache/hudi/utilities/TableSizeStats.java  |  56 ++++-----
 .../org/apache/hudi/utilities/UtilHelpers.java     |  59 +++++----
 .../kafka/HoodieWriteCommitKafkaCallback.java      |  18 ++-
 .../pulsar/HoodieWriteCommitPulsarCallback.java    |  14 +--
 .../deser/KafkaAvroSchemaDeserializer.java         |   5 +-
 .../ingestion/HoodieIngestionException.java        |   6 +-
 .../ingestion/HoodieIngestionService.java          |  18 ++-
 .../hudi/utilities/multitable/ArchiveTask.java     |   7 +-
 .../hudi/utilities/multitable/ClusteringTask.java  |   6 +-
 .../multitable/HoodieMultiTableServicesMain.java   |  12 +-
 .../multitable/MultiTableServiceUtils.java         |   7 +-
 .../hudi/utilities/perf/TimelineServerPerf.java    |   9 +-
 .../utilities/schema/DelegatingSchemaProvider.java |  10 +-
 .../utilities/schema/FilebasedSchemaProvider.java  |   7 +-
 .../hudi/utilities/schema/HiveSchemaProvider.java  |  18 +--
 .../utilities/schema/SchemaRegistryProvider.java   |   8 +-
 .../utilities/schema/SimpleSchemaProvider.java     |   7 +-
 .../DeleteSupportSchemaPostProcessor.java          |   8 +-
 .../DropColumnSchemaPostProcessor.java             |   8 +-
 .../utilities/transform/ChainedTransformer.java    |   8 +-
 .../utilities/transform/FlatteningTransformer.java |   7 +-
 .../transform/SqlFileBasedTransformer.java         |  12 +-
 .../transform/SqlQueryBasedTransformer.java        |  10 +-
 35 files changed, 357 insertions(+), 428 deletions(-)

diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index 221eb7c7793b..5f8c620b5291 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -234,6 +234,7 @@
     <dependency>
       <groupId>org.projectlombok</groupId>
       <artifactId>lombok</artifactId>
+      <scope>provided</scope>
     </dependency>
 
     <!-- Fasterxml -->
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
index 2510edce72a8..538253c9f60c 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
@@ -28,13 +28,12 @@ import 
org.apache.hudi.utilities.exception.HoodieIncrementalPullSQLException;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.stringtemplate.v4.ST;
 
 import java.io.File;
@@ -62,10 +61,9 @@ import static 
org.apache.hudi.io.util.FileIOUtils.readAsUTFString;
  * - Only the source table can be incrementally pulled (usually the largest 
table) - The incrementally pulled table
  * can't be referenced more than once.
  */
+@Slf4j
 public class HiveIncrementalPuller {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(HiveIncrementalPuller.class);
-
   public static class Config implements Serializable {
 
     @Parameter(names = {"--hiveUrl"})
@@ -133,15 +131,14 @@ public class HiveIncrementalPuller {
       incrementalSQL = scanner.useDelimiter("\\Z").next();
     }
     if (!incrementalSQL.contains(config.sourceDb + "." + config.sourceTable)) {
-      LOG.error("Incremental SQL does not have " + config.sourceDb + "." + 
config.sourceTable
-          + ", which means its pulling from a different table. Fencing this 
from happening.");
+      log.error("Incremental SQL does not have {}.{}, which means its pulling 
from a different table. Fencing this from happening.",
+          config.sourceDb, config.sourceTable);
       throw new HoodieIncrementalPullSQLException(
           "Incremental SQL does not have " + config.sourceDb + "." + 
config.sourceTable);
     }
     if (!incrementalSQL.contains("`_hoodie_commit_time` > '%s'")) {
-      LOG.error("Incremental SQL : " + incrementalSQL
-          + " does not contain `_hoodie_commit_time` > '%s'. Please add "
-          + "this clause for incremental to work properly.");
+      log.error("Incremental SQL : {} does not contain `_hoodie_commit_time` > 
'%s'. Please add this clause for incremental to work properly.",
+          incrementalSQL);
       throw new HoodieIncrementalPullSQLException(
           "Incremental SQL does not have clause `_hoodie_commit_time` > '%s', 
which "
               + "means its not pulling incrementally");
@@ -157,14 +154,14 @@ public class HiveIncrementalPuller {
     try {
       if (config.fromCommitTime == null) {
         config.fromCommitTime = inferCommitTime(fs);
-        LOG.info("FromCommitTime inferred as " + config.fromCommitTime);
+        log.info("FromCommitTime inferred as {}", config.fromCommitTime);
       }
 
-      LOG.info("FromCommitTime - " + config.fromCommitTime);
+      log.info("FromCommitTime - {}", config.fromCommitTime);
       String sourceTableLocation = getTableLocation(config.sourceDb, 
config.sourceTable);
       String lastCommitTime = getLastCommitTimePulled(fs, sourceTableLocation);
       if (lastCommitTime == null) {
-        LOG.info("Nothing to pull. However we will continue to create a empty 
table");
+        log.info("Nothing to pull. However we will continue to create a empty 
table");
         lastCommitTime = config.fromCommitTime;
       }
 
@@ -182,9 +179,9 @@ public class HiveIncrementalPuller {
 
       initHiveBeelineProperties(stmt);
       executeIncrementalSQL(tempDbTable, tempDbTablePath, stmt);
-      LOG.info("Finished HoodieReader execution");
+      log.info("Finished HoodieReader execution");
     } catch (SQLException e) {
-      LOG.error("Exception when executing SQL", e);
+      log.error("Exception when executing SQL", e);
       throw new IOException("Could not scan " + config.sourceTable + " 
incrementally", e);
     } finally {
       try {
@@ -192,14 +189,14 @@ public class HiveIncrementalPuller {
           stmt.close();
         }
       } catch (SQLException e) {
-        LOG.error("Could not close the resultSet opened ", e);
+        log.error("Could not close the resultSet opened ", e);
       }
       try {
         if (this.connection != null) {
           this.connection.close();
         }
       } catch (SQLException e) {
-        LOG.error("Could not close the JDBC connection", e);
+        log.error("Could not close the JDBC connection", e);
       } finally {
         this.connection = null;
       }
@@ -229,7 +226,7 @@ public class HiveIncrementalPuller {
   }
 
   private void initHiveBeelineProperties(Statement stmt) throws SQLException {
-    LOG.info("Setting up Hive JDBC Session with properties");
+    log.info("Setting up Hive JDBC Session with properties");
     // set the queue
     executeStatement("set mapred.job.queue.name=" + config.yarnQueueName, 
stmt);
     // Set the inputFormat to HoodieCombineHiveInputFormat
@@ -247,18 +244,18 @@ public class HiveIncrementalPuller {
   }
 
   private boolean deleteHDFSPath(FileSystem fs, String path) throws 
IOException {
-    LOG.info("Deleting path " + path);
+    log.info("Deleting path {}", path);
     return fs.delete(new Path(path), true);
   }
 
   private void executeStatement(String sql, Statement stmt) throws 
SQLException {
-    LOG.info("Executing: " + sql);
+    log.info("Executing: {}", sql);
     stmt.execute(sql);
   }
 
   private String inferCommitTime(FileSystem fs) throws IOException {
-    LOG.info("FromCommitTime not specified. Trying to infer it from Hoodie 
table " + config.targetDb + "."
-        + config.targetTable);
+    log.info("FromCommitTime not specified. Trying to infer it from Hoodie 
table {}.{}",
+        config.targetDb, config.targetTable);
     String targetDataLocation = getTableLocation(config.targetDb, 
config.targetTable);
     return scanForCommitTime(fs, targetDataLocation);
   }
@@ -272,7 +269,7 @@ public class HiveIncrementalPuller {
       resultSet = stmt.executeQuery("describe formatted `" + db + "." + table 
+ "`");
       while (resultSet.next()) {
         if (resultSet.getString(1).trim().equals("Location:")) {
-          LOG.info("Inferred table location for " + db + "." + table + " as " 
+ resultSet.getString(2));
+          log.info("Inferred table location for {}.{} as {}", db, table, 
resultSet.getString(2));
           return resultSet.getString(2);
         }
       }
@@ -287,7 +284,7 @@ public class HiveIncrementalPuller {
           resultSet.close();
         }
       } catch (SQLException e) {
-        LOG.error("Could not close the resultSet opened ", e);
+        log.error("Could not close the resultSet opened ", e);
       }
     }
     return null;
@@ -314,7 +311,7 @@ public class HiveIncrementalPuller {
   private boolean ensureTempPathExists(FileSystem fs, String lastCommitTime) 
throws IOException {
     Path targetBaseDirPath = new Path(config.hoodieTmpDir, config.targetTable 
+ "__" + config.sourceTable);
     if (!fs.exists(targetBaseDirPath)) {
-      LOG.info("Creating " + targetBaseDirPath + " with permission 
drwxrwxrwx");
+      log.info("Creating {} with permission drwxrwxrwx", targetBaseDirPath);
       boolean result =
           FileSystem.mkdirs(fs, targetBaseDirPath, new 
FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
       if (!result) {
@@ -329,7 +326,7 @@ public class HiveIncrementalPuller {
         throw new HoodieException("Could not delete existing " + targetPath);
       }
     }
-    LOG.info("Creating " + targetPath + " with permission drwxrwxrwx");
+    log.info("Creating {} with permission drwxrwxrwx", targetPath);
     return FileSystem.mkdirs(fs, targetBaseDirPath, new 
FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
   }
 
@@ -341,17 +338,17 @@ public class HiveIncrementalPuller {
         .findInstantsAfter(config.fromCommitTime, 
config.maxCommits).getInstantsAsStream().map(HoodieInstant::requestedTime)
         .collect(Collectors.toList());
     if (commitsToSync.isEmpty()) {
-      LOG.info("Nothing to sync. All commits in {} are {} and from commit time 
is {}", config.sourceTable, metadata.getActiveTimeline().getCommitsTimeline()
+      log.info("Nothing to sync. All commits in {} are {} and from commit time 
is {}", config.sourceTable, metadata.getActiveTimeline().getCommitsTimeline()
           .filterCompletedInstants().getInstants(), config.fromCommitTime);
       return null;
     }
-    LOG.info("Syncing commits {}", commitsToSync);
+    log.info("Syncing commits {}", commitsToSync);
     return commitsToSync.get(commitsToSync.size() - 1);
   }
 
   private Connection getConnection() throws SQLException {
     if (connection == null) {
-      LOG.info("Getting Hive Connection to {}", config.hiveJDBCUrl);
+      log.info("Getting Hive Connection to {}", config.hiveJDBCUrl);
       this.connection = DriverManager.getConnection(config.hiveJDBCUrl, 
config.hiveUsername, config.hivePassword);
 
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java
index 63ec3354410d..90b0c7dc1515 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java
@@ -27,19 +27,17 @@ import org.apache.hudi.exception.HoodieException;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
+@Slf4j
 public class HoodieCleaner {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieCleaner.class);
-
   /**
    * Config for Cleaner.
    */
@@ -63,7 +61,7 @@ public class HoodieCleaner {
     this.cfg = cfg;
     this.jssc = jssc;
     this.props = props;
-    LOG.info("Creating Cleaner with configs : " + props.toString());
+    log.info("Creating Cleaner with configs : {}", props.toString());
   }
 
   public void run() {
@@ -124,6 +122,6 @@ public class HoodieCleaner {
       SparkAdapterSupport$.MODULE$.sparkAdapter().stopSparkContext(jssc, 
exitCode);
     }
 
-    LOG.info("Cleaner ran successfully");
+    log.info("Cleaner ran successfully");
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
index c378bce9026c..67e35aea7831 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
@@ -37,9 +37,8 @@ import org.apache.hudi.exception.HoodieException;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -53,9 +52,9 @@ import static 
org.apache.hudi.utilities.UtilHelpers.PURGE_PENDING_INSTANT;
 import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
 import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
 
+@Slf4j
 public class HoodieClusteringJob {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieClusteringJob.class);
   private final Config cfg;
   private final TypedProperties props;
   private final JavaSparkContext jsc;
@@ -168,7 +167,7 @@ public class HoodieClusteringJob {
     if (result != 0) {
       throw new HoodieException(resultMsg + " failed");
     }
-    LOG.info(resultMsg + " success");
+    log.info("{} success", resultMsg);
     jsc.stop();
   }
 
@@ -187,28 +186,28 @@ public class HoodieClusteringJob {
     return UtilHelpers.retry(retry, () -> {
       switch (cfg.runningMode.toLowerCase()) {
         case SCHEDULE: {
-          LOG.info("Running Mode: [" + SCHEDULE + "]; Do schedule");
+          log.info("Running Mode: [{}]; Do schedule", SCHEDULE);
           Option<String> instantTime = doSchedule(jsc);
           int result = instantTime.isPresent() ? 0 : -1;
           if (result == 0) {
-            LOG.info("The schedule instant time is " + instantTime.get());
+            log.info("The schedule instant time is {}", instantTime.get());
           }
           return result;
         }
         case SCHEDULE_AND_EXECUTE: {
-          LOG.info("Running Mode: [" + SCHEDULE_AND_EXECUTE + "]");
+          log.info("Running Mode: [{}]", SCHEDULE_AND_EXECUTE);
           return doScheduleAndCluster(jsc);
         }
         case EXECUTE: {
-          LOG.info("Running Mode: [" + EXECUTE + "]; Do cluster");
+          log.info("Running Mode: [{}]; Do cluster", EXECUTE);
           return doCluster(jsc);
         }
         case PURGE_PENDING_INSTANT: {
-          LOG.info("Running Mode: [" + PURGE_PENDING_INSTANT + "];");
+          log.info("Running Mode: [{}];", PURGE_PENDING_INSTANT);
           return doPurgePendingInstant(jsc);
         }
         default: {
-          LOG.error("Unsupported running mode [" + cfg.runningMode + "], quit 
the job directly");
+          log.error("Unsupported running mode [{}], quit the job directly", 
cfg.runningMode);
           return -1;
         }
       }
@@ -224,10 +223,9 @@ public class HoodieClusteringJob {
             metaClient.getActiveTimeline().getFirstPendingClusterInstant();
         if (firstClusteringInstant.isPresent()) {
           cfg.clusteringInstantTime = 
firstClusteringInstant.get().requestedTime();
-          LOG.info("Found the earliest scheduled clustering instant which will 
be executed: "
-              + cfg.clusteringInstantTime);
+          log.info("Found the earliest scheduled clustering instant which will 
be executed: {}", cfg.clusteringInstantTime);
         } else {
-          LOG.info("There is no scheduled clustering in the table.");
+          log.info("There is no scheduled clustering in the table.");
           return 0;
         }
       }
@@ -255,7 +253,7 @@ public class HoodieClusteringJob {
   }
 
   private int doScheduleAndCluster(JavaSparkContext jsc) throws Exception {
-    LOG.info("Step 1: Do schedule");
+    log.info("Step 1: Do schedule");
     metaClient = HoodieTableMetaClient.reload(metaClient);
     String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
     try (SparkRDDWriteClient<HoodieRecordPayload> client = 
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, 
Option.empty(), props)) {
@@ -265,19 +263,19 @@ public class HoodieClusteringJob {
         Option<HoodieInstant> staleInstant = 
TableServiceUtils.findStaleInflightInstant(
             metaClient, HoodieTimeline.CLUSTERING_ACTION, 
cfg.maxProcessingTimeMs);
         if (staleInstant.isPresent()) {
-          LOG.info("Found failed clustering instant at : " + 
staleInstant.get() + "; Will rollback the failed clustering and re-trigger 
again.");
+          log.info("Found failed clustering instant at : {}; Will rollback the 
failed clustering and re-trigger again.", staleInstant.get());
           instantTime = Option.of(staleInstant.get().requestedTime());
         }
       }
 
       instantTime = instantTime.isPresent() ? instantTime : doSchedule(client);
       if (!instantTime.isPresent()) {
-        LOG.info("Couldn't generate cluster plan");
+        log.info("Couldn't generate cluster plan");
         return -1;
       }
 
-      LOG.info("The schedule instant time is " + instantTime.get());
-      LOG.info("Step 2: Do cluster");
+      log.info("The schedule instant time is {}", instantTime.get());
+      log.info("Step 2: Do cluster");
       Option<HoodieCommitMetadata> metadata = 
client.cluster(instantTime.get()).getCommitMetadata();
       clean(client);
       return UtilHelpers.handleErrors(metadata.get(), instantTime.get());
@@ -345,7 +343,7 @@ public class HoodieClusteringJob {
       metaClient.reloadActiveTimeline();
       if (metaClient.getActiveTimeline().filterInflightsAndRequested()
           .containsInstant(instant.requestedTime())) {
-        LOG.info("Rolling back expired clustering instant {}", 
instant.requestedTime());
+        log.info("Rolling back expired clustering instant {}", 
instant.requestedTime());
         client.rollback(instant.requestedTime());
       }
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
index 2701ce7c9a94..fe64c95a364f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
@@ -38,20 +38,19 @@ import 
org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionS
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
+@Slf4j
 public class HoodieCompactor {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieCompactor.class);
   public static final String EXECUTE = "execute";
   public static final String SCHEDULE = "schedule";
   public static final String SCHEDULE_AND_EXECUTE = "scheduleandexecute";
@@ -75,11 +74,12 @@ public class HoodieCompactor {
     this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), false);
     if (this.metaClient.getTableConfig().isMetadataTableAvailable()) {
       // add default lock config options if MDT is enabled.
-      UtilHelpers.addLockOptions(cfg.basePath, 
this.metaClient.getBasePath().toUri().getScheme(),  this.props);
+      UtilHelpers.addLockOptions(cfg.basePath, 
this.metaClient.getBasePath().toUri().getScheme(), this.props);
     }
   }
 
   public static class Config implements Serializable {
+
     @Parameter(names = {"--base-path", "-sp"}, description = "Base path for 
the table", required = true)
     public String basePath = null;
     @Parameter(names = {"--table-name", "-tn"}, description = "Table name", 
required = true)
@@ -196,7 +196,7 @@ public class HoodieCompactor {
     if (ret != 0) {
       throw new HoodieException("Fail to run compaction for " + cfg.tableName 
+ ", return code: " + ret);
     }
-    LOG.info("Success to run compaction for " + cfg.tableName);
+    log.info("Success to run compaction for {}", cfg.tableName);
     jsc.stop();
   }
 
@@ -204,29 +204,29 @@ public class HoodieCompactor {
     this.fs = HadoopFSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration());
     // need to do validate in case that users call compact() directly without 
setting cfg.runningMode
     validateRunningMode(cfg);
-    LOG.info(cfg.toString());
+    log.info(cfg.toString());
 
     int ret = UtilHelpers.retry(retry, () -> {
       switch (cfg.runningMode.toLowerCase()) {
         case SCHEDULE: {
-          LOG.info("Running Mode: [" + SCHEDULE + "]; Do schedule");
+          log.info("Running Mode: [{}] Do schedule", SCHEDULE);
           Option<String> instantTime = doSchedule(jsc);
           int result = instantTime.isPresent() ? 0 : -1;
           if (result == 0) {
-            LOG.info("The schedule instant time is " + instantTime.get());
+            log.info("The schedule instant time is {}", instantTime.get());
           }
           return result;
         }
         case SCHEDULE_AND_EXECUTE: {
-          LOG.info("Running Mode: [" + SCHEDULE_AND_EXECUTE + "]");
+          log.info("Running Mode: [{}]", SCHEDULE_AND_EXECUTE);
           return doScheduleAndCompact(jsc);
         }
         case EXECUTE: {
-          LOG.info("Running Mode: [" + EXECUTE + "]; Do compaction");
+          log.info("Running Mode: [{}]; Do compaction", EXECUTE);
           return doCompact(jsc);
         }
         default: {
-          LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit 
the job directly");
+          log.info("Unsupported running mode [{}], quit the job directly", 
cfg.runningMode);
           return -1;
         }
       }
@@ -235,7 +235,7 @@ public class HoodieCompactor {
   }
 
   private Integer doScheduleAndCompact(JavaSparkContext jsc) throws Exception {
-    LOG.info("Step 1: Do schedule");
+    log.info("Step 1: Do schedule");
     metaClient = HoodieTableMetaClient.reload(metaClient);
     Option<String> instantTime = Option.empty();
 
@@ -243,20 +243,20 @@ public class HoodieCompactor {
       Option<HoodieInstant> staleInstant = 
TableServiceUtils.findStaleInflightInstant(
           metaClient, HoodieTimeline.COMPACTION_ACTION, 
cfg.maxProcessingTimeMs);
       if (staleInstant.isPresent()) {
-        LOG.info("Found failed compaction instant at : " + staleInstant.get() 
+ "; Will rollback the failed compaction and re-trigger again.");
+        log.info("Found failed compaction instant at : {}; Will rollback the 
failed compaction and re-trigger again.", staleInstant.get());
         instantTime = Option.of(staleInstant.get().requestedTime());
       }
     }
 
     instantTime = instantTime.isPresent() ? instantTime : doSchedule(jsc);
     if (!instantTime.isPresent()) {
-      LOG.error("Couldn't do schedule");
+      log.error("Couldn't do schedule");
       return -1;
     }
     cfg.compactionInstantTime = instantTime.get();
 
-    LOG.info("The schedule instant time is {}", instantTime.get());
-    LOG.info("Step 2: Do compaction");
+    log.info("The schedule instant time is {}", instantTime.get());
+    log.info("Step 2: Do compaction");
 
     return doCompact(jsc);
   }
@@ -278,10 +278,10 @@ public class HoodieCompactor {
     } else {
       schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
     }
-    LOG.info("Schema --> : " + schemaStr);
+    log.info("Schema --> : {}", schemaStr);
 
     try (SparkRDDWriteClient<HoodieRecordPayload> client =
-             UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, 
cfg.parallelism, Option.empty(), props)) {
+        UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, 
cfg.parallelism, Option.empty(), props)) {
       // If no compaction instant is provided by --instant-time, find the 
earliest scheduled compaction
       // instant from the active timeline
       if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) {
@@ -289,10 +289,9 @@ public class HoodieCompactor {
         Option<HoodieInstant> firstCompactionInstant = 
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
         if (firstCompactionInstant.isPresent()) {
           cfg.compactionInstantTime = 
firstCompactionInstant.get().requestedTime();
-          LOG.info("Found the earliest scheduled compaction instant which will 
be executed: "
-              + cfg.compactionInstantTime);
+          log.info("Found the earliest scheduled compaction instant which will 
be executed: {}", cfg.compactionInstantTime);
         } else {
-          LOG.info("There is no scheduled compaction in the table.");
+          log.info("There is no scheduled compaction in the table.");
           return 0;
         }
       }
@@ -305,7 +304,7 @@ public class HoodieCompactor {
 
   private Option<String> doSchedule(JavaSparkContext jsc) {
     try (SparkRDDWriteClient client =
-             UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", 
cfg.parallelism, Option.of(cfg.strategyClassName), props)) {
+        UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism, 
Option.of(cfg.strategyClassName), props)) {
 
       return client.scheduleCompaction(Option.empty());
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java
index 521f94bbafc5..f34fe8650401 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java
@@ -38,10 +38,9 @@ import org.apache.hudi.table.repair.RepairUtils;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -98,10 +97,10 @@ import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTim
  * --min-validate-interval-seconds 60
  * ```
  */
+@Slf4j
 public class HoodieDataTableValidator implements Serializable {
 
   private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieDataTableValidator.class);
 
   // Spark context
   private transient JavaSparkContext jsc;
@@ -253,7 +252,7 @@ public class HoodieDataTableValidator implements 
Serializable {
     try {
       validator.run();
     } catch (Throwable throwable) {
-      LOG.error("Fail to do hoodie Data table validation for " + 
validator.cfg, throwable);
+      log.error("Fail to do hoodie Data table validation for " + 
validator.cfg, throwable);
     } finally {
       jsc.stop();
     }
@@ -261,12 +260,12 @@ public class HoodieDataTableValidator implements 
Serializable {
 
   public void run() {
     try {
-      LOG.info(cfg.toString());
+      log.info(cfg.toString());
       if (cfg.continuous) {
-        LOG.info(" ****** do hoodie data table validation in CONTINUOUS mode 
******");
+        log.info(" ****** do hoodie data table validation in CONTINUOUS mode 
******");
         doHoodieDataTableValidationContinuous();
       } else {
-        LOG.info(" ****** do hoodie data table validation once ******");
+        log.info(" ****** do hoodie data table validation once ******");
         doHoodieDataTableValidationOnce();
       }
     } catch (Exception e) {
@@ -283,7 +282,7 @@ public class HoodieDataTableValidator implements 
Serializable {
     try {
       doDataTableValidation();
     } catch (HoodieValidationException e) {
-      LOG.error("Metadata table validation failed to 
HoodieValidationException", e);
+      log.error("Metadata table validation failed to 
HoodieValidationException", e);
       if (!cfg.ignoreFailed) {
         throw e;
       }
@@ -320,9 +319,8 @@ public class HoodieDataTableValidator implements 
Serializable {
         }).collect(Collectors.toList());
 
         if (!danglingFilePaths.isEmpty() && danglingFilePaths.size() > 0) {
-          LOG.error("Data table validation failed due to dangling files count "
-              + danglingFilePaths.size() + ", found before active timeline");
-          danglingFilePaths.forEach(entry -> LOG.error("Dangling file: " + 
entry.toString()));
+          log.error("Data table validation failed due to dangling files count 
{}, found before active timeline", danglingFilePaths.size());
+          danglingFilePaths.forEach(entry -> log.error("Dangling file: " + 
entry.toString()));
           finalResult = false;
           if (!cfg.ignoreFailed) {
             throw new HoodieValidationException(
@@ -354,8 +352,8 @@ public class HoodieDataTableValidator implements 
Serializable {
         }, hoodieInstants.size()).stream().collect(Collectors.toList());
 
         if (!danglingFiles.isEmpty()) {
-          LOG.error("Data table validation failed due to extra files found for 
completed commits {}", danglingFiles.size());
-          danglingFiles.forEach(entry -> LOG.error("Dangling file: {}", 
entry));
+          log.error("Data table validation failed due to extra files found for 
completed commits {}", danglingFiles.size());
+          danglingFiles.forEach(entry -> log.error("Dangling file: {}", 
entry));
           finalResult = false;
           if (!cfg.ignoreFailed) {
             throw new HoodieValidationException("Data table validation failed 
due to dangling files " + danglingFiles.size());
@@ -363,16 +361,16 @@ public class HoodieDataTableValidator implements 
Serializable {
         }
       }
     } catch (Exception e) {
-      LOG.error("Data table validation failed", e);
+      log.error("Data table validation failed", e);
       if (!cfg.ignoreFailed) {
         throw new HoodieValidationException("Data table validation failed due 
to " + e.getMessage(), e);
       }
     }
 
     if (finalResult) {
-      LOG.info("Data table validation succeeded.");
+      log.info("Data table validation succeeded.");
     } else {
-      LOG.error("Data table validation failed.");
+      log.error("Data table validation failed.");
     }
   }
 
@@ -389,12 +387,12 @@ public class HoodieDataTableValidator implements 
Serializable {
             long toSleepMs = cfg.minValidateIntervalSeconds * 1000 - 
(System.currentTimeMillis() - start);
 
             if (toSleepMs > 0) {
-              LOG.info("Last validate ran less than min validate interval: " + 
cfg.minValidateIntervalSeconds + " s, sleep: "
-                  + toSleepMs + " ms.");
+              log.info("Last validate ran less than min validate interval: {} 
s, sleep: {} ms.",
+                  cfg.minValidateIntervalSeconds, toSleepMs);
               Thread.sleep(toSleepMs);
             }
           } catch (HoodieValidationException e) {
-            LOG.error("Shutting down AsyncDataTableValidateService due to 
HoodieValidationException", e);
+            log.error("Shutting down AsyncDataTableValidateService due to 
HoodieValidationException", e);
             if (!cfg.ignoreFailed) {
               throw e;
             }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
index 9e2e7d035b9a..b9dadae7d87a 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
@@ -38,13 +38,12 @@ import org.apache.hudi.table.HoodieSparkTable;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -101,10 +100,10 @@ import scala.Tuple2;
  *
  * Also you can use --help to find more configs to use.
  */
+@Slf4j
 public class HoodieDropPartitionsTool implements Serializable {
 
   private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieDropPartitionsTool.class);
   // Spark context
   private final transient JavaSparkContext jsc;
   // config
@@ -282,7 +281,7 @@ public class HoodieDropPartitionsTool implements 
Serializable {
     try {
       tool.run();
     } catch (Throwable throwable) {
-      LOG.error("Fail to run deleting table partitions for " + cfg, throwable);
+      log.error("Fail to run deleting table partitions for {}", cfg, 
throwable);
     } finally {
       jsc.stop();
     }
@@ -290,21 +289,21 @@ public class HoodieDropPartitionsTool implements 
Serializable {
 
   public void run() {
     try {
-      LOG.info(cfg.toString());
+      log.info(cfg.toString());
 
       Mode mode = Mode.valueOf(cfg.runningMode.toUpperCase());
       switch (mode) {
         case DELETE:
-          LOG.info(" ****** The Hoodie Drop Partitions Tool is in delete mode 
****** ");
+          log.info(" ****** The Hoodie Drop Partitions Tool is in delete mode 
****** ");
           doDeleteTablePartitions();
           syncToHiveIfNecessary();
           break;
         case DRY_RUN:
-          LOG.info(" ****** The Hoodie Drop Partitions Tool is in dry-run mode 
****** ");
+          log.info(" ****** The Hoodie Drop Partitions Tool is in dry-run mode 
****** ");
           dryRun();
           break;
         default:
-          LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit 
the job directly");
+          log.info("Unsupported running mode [{}], quit the job directly", 
cfg.runningMode);
       }
     } catch (Exception e) {
       throw new HoodieException("Unable to delete table partitions in " + 
cfg.basePath, e);
@@ -368,19 +367,17 @@ public class HoodieDropPartitionsTool implements 
Serializable {
   }
 
   private void syncHive(HiveSyncConfig hiveSyncConfig) {
-    LOG.info("Syncing target hoodie table with hive table("
-        + 
hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME)
-        + "). Hive metastore URL :"
-        + hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_URL)
-        + ", basePath :" + cfg.basePath);
-    LOG.info("Hive Sync Conf => " + hiveSyncConfig);
+    log.info("Syncing target hoodie table with hive table({}). Hive metastore 
URL :{}, basePath :{}",
+        
hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
+        hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_URL), 
cfg.basePath);
+    log.info("Hive Sync Conf => {}", hiveSyncConfig);
     FileSystem fs = HadoopFSUtils.getFs(cfg.basePath, 
jsc.hadoopConfiguration());
     HiveConf hiveConf = new HiveConf();
     if (!StringUtils.isNullOrEmpty(cfg.hiveHMSUris)) {
       hiveConf.set("hive.metastore.uris", cfg.hiveHMSUris);
     }
     hiveConf.addResource(fs.getConf());
-    LOG.info("Hive Conf => " + hiveConf.getAllProperties().toString());
+    log.info("Hive Conf => {}", hiveConf.getAllProperties().toString());
     try (HiveSyncTool hiveSyncTool = new 
HiveSyncTool(hiveSyncConfig.getProps(), hiveConf)) {
       hiveSyncTool.syncHoodieTable();
     }
@@ -392,9 +389,9 @@ public class HoodieDropPartitionsTool implements 
Serializable {
    * @param partitionToReplaceFileIds
    */
   private void printDeleteFilesInfo(Map<String, List<String>> 
partitionToReplaceFileIds) {
-    LOG.info("Data files and partitions to delete : ");
+    log.info("Data files and partitions to delete : ");
     for (Map.Entry<String, List<String>> entry  : 
partitionToReplaceFileIds.entrySet()) {
-      LOG.info(String.format("Partitions : %s, corresponding data file IDs : 
%s", entry.getKey(), entry.getValue()));
+      log.info("Partitions : {}, corresponding data file IDs : {}", 
entry.getKey(), entry.getValue());
     }
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
index b3d743693eec..2c7bab98900f 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
@@ -35,10 +35,9 @@ import org.apache.hudi.metadata.MetadataPartitionType;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -87,9 +86,9 @@ import static 
org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
  * hoodie.write.concurrency.mode=optimistic_concurrency_control
  * 
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
  */
+@Slf4j
 public class HoodieIndexer {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieIndexer.class);
   static final String DROP_INDEX = "dropindex";
 
   private final HoodieIndexer.Config cfg;
@@ -165,21 +164,21 @@ public class HoodieIndexer {
     if (result != 0) {
       throw new HoodieException(resultMsg + " failed");
     }
-    LOG.info(resultMsg + " success");
+    log.info("{} success", resultMsg);
     jsc.stop();
   }
 
   public int start(int retry) {
     // indexing should be done only if metadata is enabled
     if (!props.getBoolean(HoodieMetadataConfig.ENABLE.key())) {
-      LOG.error(String.format("Metadata is not enabled. Please set %s to 
true.", HoodieMetadataConfig.ENABLE.key()));
+      log.error("Metadata is not enabled. Please set {} to true.", 
HoodieMetadataConfig.ENABLE.key());
       return -1;
     }
 
     // all inflight or completed metadata partitions have already been 
initialized
     // so enable corresponding indexes in the props so that they're not deleted
     Set<String> initializedMetadataPartitions = 
getInflightAndCompletedMetadataPartitions(metaClient.getTableConfig());
-    LOG.info("Setting props for: " + initializedMetadataPartitions);
+    log.info("Setting props for: {}", initializedMetadataPartitions);
     initializedMetadataPartitions.forEach(p -> {
       if (PARTITION_NAME_COLUMN_STATS.equals(p)) {
         props.setProperty(ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "true");
@@ -195,28 +194,28 @@ public class HoodieIndexer {
     return UtilHelpers.retry(retry, () -> {
       switch (cfg.runningMode.toLowerCase()) {
         case SCHEDULE: {
-          LOG.info("Running Mode: [" + SCHEDULE + "]; Do schedule");
+          log.info("Running Mode: [{}]; Do schedule", SCHEDULE);
           Option<String> instantTime = scheduleIndexing(jsc);
           int result = instantTime.isPresent() ? 0 : -1;
           if (result == 0) {
-            LOG.info("The schedule instant time is " + instantTime.get());
+            log.info("The schedule instant time is {}", instantTime.get());
           }
           return result;
         }
         case SCHEDULE_AND_EXECUTE: {
-          LOG.info("Running Mode: [" + SCHEDULE_AND_EXECUTE + "]");
+          log.info("Running Mode: [{}]", SCHEDULE_AND_EXECUTE);
           return scheduleAndRunIndexing(jsc);
         }
         case EXECUTE: {
-          LOG.info("Running Mode: [" + EXECUTE + "];");
+          log.info("Running Mode: [{}];", EXECUTE);
           return runIndexing(jsc);
         }
         case DROP_INDEX: {
-          LOG.info("Running Mode: [" + DROP_INDEX + "];");
+          log.info("Running Mode: [{}];", DROP_INDEX);
           return dropIndex(jsc);
         }
         default: {
-          LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit 
the job directly");
+          log.info("Unsupported running mode [{}], quit the job directly", 
cfg.runningMode);
           return -1;
         }
       }
@@ -248,7 +247,7 @@ public class HoodieIndexer {
 
     Option<String> indexingInstant = client.scheduleIndexing(partitionTypes, 
Collections.emptyList());
     if (!indexingInstant.isPresent()) {
-      LOG.error("Scheduling of index action did not return any instant.");
+      log.error("Scheduling of index action did not return any instant.");
     }
     return indexingInstant;
   }
@@ -264,7 +263,7 @@ public class HoodieIndexer {
     Set<String> requestedIndexPartitionPaths = 
partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
     requestedIndexPartitionPaths.retainAll(indexedMetadataPartitions);
     if (!requestedIndexPartitionPaths.isEmpty()) {
-      LOG.error("Following indexes already built: " + 
requestedIndexPartitionPaths);
+      log.error("Following indexes already built: {}", 
requestedIndexPartitionPaths);
       return true;
     }
     return false;
@@ -286,8 +285,7 @@ public class HoodieIndexer {
             .firstInstant();
         if (earliestPendingIndexInstant.isPresent()) {
           cfg.indexInstantTime = 
earliestPendingIndexInstant.get().requestedTime();
-          LOG.info("Found the earliest scheduled indexing instant which will 
be executed: "
-              + cfg.indexInstantTime);
+          log.info("Found the earliest scheduled indexing instant which will 
be executed: {}", cfg.indexInstantTime);
         } else {
           throw new HoodieIndexException("There is no scheduled indexing in 
the table.");
         }
@@ -316,18 +314,18 @@ public class HoodieIndexer {
       client.dropIndex(partitionTypes);
       return 0;
     } catch (Exception e) {
-      LOG.error("Failed to drop index. ", e);
+      log.error("Failed to drop index. ", e);
       return -1;
     }
   }
 
   private boolean handleResponse(Option<HoodieIndexCommitMetadata> 
commitMetadata) {
     if (!commitMetadata.isPresent()) {
-      LOG.error("Indexing failed as no commit metadata present.");
+      log.error("Indexing failed as no commit metadata present.");
       return false;
     }
     List<HoodieIndexPartitionInfo> indexPartitionInfos = 
commitMetadata.get().getIndexPartitionInfos();
-    LOG.info("Indexing complete for partitions: {}", 
indexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toList()));
+    log.info("Indexing complete for partitions: {}", 
indexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toList()));
     return isIndexBuiltForAllRequestedTypes(indexPartitionInfos);
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index daf9370c3550..d6876bea6e69 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -97,6 +97,8 @@ import org.apache.hudi.utilities.util.BloomFilterData;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -105,8 +107,6 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.Optional;
 import org.apache.spark.sql.functions;
 import org.apache.spark.storage.StorageLevel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -204,10 +204,10 @@ import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getLogFileColumnR
  * --min-validate-interval-seconds 60
  * ```
  */
+@Slf4j
 public class HoodieMetadataTableValidator implements Serializable {
 
   private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMetadataTableValidator.class);
 
   // Spark context
   private transient JavaSparkContext jsc;
@@ -222,6 +222,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
 
   private final String taskLabels;
 
+  @Getter
   private final List<Throwable> throwables = new ArrayList<>();
 
   public HoodieMetadataTableValidator(JavaSparkContext jsc, Config cfg) {
@@ -240,21 +241,13 @@ public class HoodieMetadataTableValidator implements 
Serializable {
           .build());
     } catch (TableNotFoundException tbe) {
       // Suppress the TableNotFound exception, table not yet created for a new 
stream
-      LOG.warn("Data table is not found. Skip current validation for: {}", 
cfg.basePath);
+      log.warn("Data table is not found. Skip current validation for: {}", 
cfg.basePath);
     }
 
     this.asyncMetadataTableValidateService = cfg.continuous ? Option.of(new 
AsyncMetadataTableValidateService()) : Option.empty();
     this.taskLabels = generateValidationTaskLabels();
   }
 
-  /**
-   * Returns list of Throwable which were encountered during validation. This 
method is useful
-   * when ignoreFailed parameter is set to true.
-   */
-  public List<Throwable> getThrowables() {
-    return throwables;
-  }
-
   /**
    * Returns true if there is a validation failure encountered during 
validation.
    * This method is useful when ignoreFailed parameter is set to true.
@@ -514,7 +507,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       HoodieMetadataTableValidator validator = new 
HoodieMetadataTableValidator(jsc, cfg);
       validator.run();
     } catch (Throwable throwable) {
-      LOG.error("Fail to do hoodie metadata table validation for {}", cfg, 
throwable);
+      log.error("Fail to do hoodie metadata table validation for {}", cfg, 
throwable);
     } finally {
       jsc.stop();
     }
@@ -522,17 +515,17 @@ public class HoodieMetadataTableValidator implements 
Serializable {
 
   public boolean run() {
     if (!metaClientOpt.isPresent()) {
-      LOG.warn("Data table is not available to read for now, skip current 
validation for: {}", cfg.basePath);
+      log.warn("Data table is not available to read for now, skip current 
validation for: {}", cfg.basePath);
       return true;
     }
     boolean result = false;
     try {
-      LOG.info(cfg.toString());
+      log.info(cfg.toString());
       if (cfg.continuous) {
-        LOG.info(" ****** do hoodie metadata table validation in CONTINUOUS 
mode - {} ******", taskLabels);
+        log.info(" ****** do hoodie metadata table validation in CONTINUOUS 
mode - {} ******", taskLabels);
         doHoodieMetadataTableValidationContinuous();
       } else {
-        LOG.info(" ****** do hoodie metadata table validation once - {} 
******", taskLabels);
+        log.info(" ****** do hoodie metadata table validation once - {} 
******", taskLabels);
         result = doHoodieMetadataTableValidationOnce();
       }
       return result;
@@ -554,7 +547,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     try {
       return doMetadataTableValidation();
     } catch (Throwable e) {
-      LOG.error("Metadata table validation failed to HoodieValidationException 
{}", taskLabels, e);
+      log.error("Metadata table validation failed to HoodieValidationException 
{}", taskLabels, e);
       if (!cfg.ignoreFailed) {
         throw e;
       }
@@ -577,7 +570,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
   public boolean doMetadataTableValidation() {
     boolean finalResult = true;
     if (!metaClientOpt.isPresent()) {
-      LOG.warn("Data table is not available to read for now, skip current 
validation.");
+      log.warn("Data table is not available to read for now, skip current 
validation.");
       return true;
     }
     HoodieTableMetaClient metaClient = this.metaClientOpt.get();
@@ -619,7 +612,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
 
     List<String> allPartitions = validatePartitions(engineContext, basePath, 
metaClient);
     if (allPartitions.isEmpty()) {
-      LOG.warn("The result of getting all partitions is null or empty, skip 
current validation. {}", taskLabels);
+      log.warn("The result of getting all partitions is null or empty, skip 
current validation. {}", taskLabels);
       return true;
     }
 
@@ -632,10 +625,10 @@ public class HoodieMetadataTableValidator implements 
Serializable {
           engineContext.parallelize(allPartitions, 
allPartitions.size()).map(partitionPath -> {
             try {
               validateFilesInPartition(metadataTableBasedContext, 
fsBasedContext, partitionPath, finalBaseFilesForCleaning);
-              LOG.info("Metadata table validation succeeded for partition {} 
(partition {})", partitionPath, taskLabels);
+              log.info("Metadata table validation succeeded for partition {} 
(partition {})", partitionPath, taskLabels);
               return Pair.<Boolean, HoodieValidationException>of(true, null);
             } catch (HoodieValidationException e) {
-              LOG.error("Metadata table validation failed for partition {} due 
to HoodieValidationException (partition {})",
+              log.error("Metadata table validation failed for partition {} due 
to HoodieValidationException (partition {})",
                   partitionPath, taskLabels, e);
               if (!cfg.ignoreFailed) {
                 throw e;
@@ -672,7 +665,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       for (Pair<Boolean, HoodieValidationException> res : result) {
         finalResult &= res.getKey();
         if (res.getKey().equals(false)) {
-          LOG.error("Metadata Validation failed for table: {}", cfg.basePath, 
res.getValue());
+          log.error("Metadata Validation failed for table: {}", cfg.basePath, 
res.getValue());
           if (res.getRight() != null) {
             throwables.add(res.getRight());
           }
@@ -680,10 +673,10 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       }
 
       if (finalResult) {
-        LOG.info("Metadata table validation succeeded ({}).", taskLabels);
+        log.info("Metadata table validation succeeded ({}).", taskLabels);
         return true;
       } else {
-        LOG.error("Metadata table validation failed ({}).", taskLabels);
+        log.error("Metadata table validation failed ({}).", taskLabels);
         return false;
       }
     } catch (HoodieValidationException validationException) {
@@ -697,14 +690,14 @@ public class HoodieMetadataTableValidator implements 
Serializable {
         throw new HoodieValidationException("Unexpected spark failure", 
sparkException);
       }
     } catch (Exception e) {
-      LOG.warn("Error closing HoodieMetadataValidationContext, "
+      log.warn("Error closing HoodieMetadataValidationContext, "
           + "ignoring the error as the validation is successful.", e);
       return true;
     }
   }
 
   private void handleValidationException(HoodieValidationException e, 
List<Pair<Boolean, HoodieValidationException>> result, String errorMsg) {
-    LOG.error("{} for table: {} ", errorMsg, cfg.basePath, e);
+    log.error("{} for table: {} ", errorMsg, cfg.basePath, e);
     if (!cfg.ignoreFailed) {
       throw e;
     }
@@ -725,7 +718,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       int finishedInstants = 
mdtMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants();
       if (finishedInstants == 0) {
         if 
(metaClientOpt.get().getCommitsTimeline().filterCompletedInstants().countInstants()
 == 0) {
-          LOG.info("There is no completed commit in both metadata table and 
corresponding data table: {}", taskLabels);
+          log.info("There is no completed commit in both metadata table and 
corresponding data table: {}", taskLabels);
           return false;
         } else {
           throw new HoodieValidationException("There is no completed instant 
for metadata table: " + cfg.basePath);
@@ -734,10 +727,10 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       return true;
     } catch (TableNotFoundException tbe) {
       // Suppress the TableNotFound exception if Metadata table is not 
available to read for now
-      LOG.warn("Metadata table is not found for table: {}. Skip current 
validation.", cfg.basePath);
+      log.warn("Metadata table is not found for table: {}. Skip current 
validation.", cfg.basePath);
       return false;
     } catch (Exception ex) {
-      LOG.warn("Metadata table is not available to read for now for table: {}, 
", cfg.basePath, ex);
+      log.warn("Metadata table is not available to read for now for table: {}, 
", cfg.basePath, ex);
       return false;
     }
   }
@@ -776,7 +769,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
                 Option<HoodieInstant> lastInstant = 
completedTimeline.lastInstant();
                 if (lastInstant.isPresent()
                     && 
InstantComparison.compareTimestamps(partitionCreationTimeOpt.get(), 
GREATER_THAN, lastInstant.get().requestedTime())) {
-                  LOG.info("Ignoring additional partition {}, as it was 
deduced to be part of a "
+                  log.info("Ignoring additional partition {}, as it was 
deduced to be part of a "
                       + "latest completed commit which was inflight when FS 
based listing was polled.", partitionFromMDT);
                   actualAdditionalPartitionsInMDT.remove(partitionFromMDT);
                 }
@@ -804,7 +797,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
         }).collect(Collectors.toList());
         additionalFromFS.removeAll(emptyPartitions);
         if (additionalFromFS.isEmpty()) {
-          LOG.info("All out of sync partitions turned out to be empty {}", 
emptyPartitions);
+          log.info("All out of sync partitions turned out to be empty {}", 
emptyPartitions);
           misMatch.set(false);
         } else {
           misMatch.set(true);
@@ -819,7 +812,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
             + toStringWithThreshold(actualAdditionalPartitionsInMDT, 
cfg.logDetailMaxLength)
             + "\".\n All " + allPartitionPathsFromFS.size() + " partitions 
from FS listing "
             + toStringWithThreshold(allPartitionPathsFromFS, 
cfg.logDetailMaxLength);
-        LOG.error(message);
+        log.error(message);
         throw new HoodieValidationException(message);
       }
     }
@@ -1252,10 +1245,10 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     if (countKeyFromTable != countKeyFromRecordIndex) {
       String message = String.format("Validation of record index count failed: 
%s entries from record index metadata, %s keys from the data table: %s",
           countKeyFromRecordIndex, countKeyFromTable, cfg.basePath);
-      LOG.error(message);
+      log.error(message);
       throw new HoodieValidationException(message);
     } else {
-      LOG.info("Validation of record index count succeeded: {} entries. Table: 
{}", countKeyFromRecordIndex, cfg.basePath);
+      log.info("Validation of record index count succeeded: {} entries. Table: 
{}", countKeyFromRecordIndex, cfg.basePath);
     }
   }
 
@@ -1338,10 +1331,10 @@ public class HoodieMetadataTableValidator implements 
Serializable {
               + "%s keys (total %s) from the data table have wrong location in 
record index "
               + "metadata. Table: %s   Sample mismatches: %s",
           diffCount, countKey, cfg.basePath, String.join(";", 
result.getRight()));
-      LOG.error(message);
+      log.error(message);
       throw new HoodieValidationException(message);
     } else {
-      LOG.info("Validation of record index content succeeded: {} entries. 
Table: {}", countKey, cfg.basePath);
+      log.info("Validation of record index content succeeded: {} entries. 
Table: {}", countKey, cfg.basePath);
     }
   }
 
@@ -1469,10 +1462,10 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     if (mismatch) {
       String message = String.format("Validation of %s for partition %s failed 
for table: %s. %s",
           label, partitionPath, cfg.basePath, errorDetails);
-      LOG.error(message);
+      log.error(message);
       throw new HoodieValidationException(message);
     } else {
-      LOG.info("Validation of {} succeeded for partition {} for table: {}", 
label, partitionPath, cfg.basePath);
+      log.info("Validation of {} succeeded for partition {} for table: {}", 
label, partitionPath, cfg.basePath);
     }
   }
 
@@ -1522,7 +1515,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
           mismatch = true;
           break;
         } else {
-          LOG.info("There are uncommitted log files in the latest file slices 
but the committed log files match: {} {}", fileSlice1, fileSlice2);
+          log.info("There are uncommitted log files in the latest file slices 
but the committed log files match: {} {}", fileSlice1, fileSlice2);
         }
       }
     }
@@ -1530,10 +1523,10 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     if (mismatch) {
       String message = String.format("Validation of %s for partition %s failed 
for table: %s. %s",
           label, partitionPath, cfg.basePath, errorDetails);
-      LOG.error(message);
+      log.error(message);
       throw new HoodieValidationException(message);
     } else {
-      LOG.info("Validation of {} succeeded for partition {} for table: {}", 
label, partitionPath, cfg.basePath);
+      log.info("Validation of {} succeeded for partition {} for table: {}", 
label, partitionPath, cfg.basePath);
     }
   }
 
@@ -1565,16 +1558,16 @@ public class HoodieMetadataTableValidator implements 
Serializable {
         // truncate start instant since range is not
         Set<String> missingCommits = 
nonActiveInstantTimes.stream().filter(instant -> 
!archivedInstants.contains(instant)).collect(Collectors.toSet());
         if (!missingCommits.isEmpty()) {
-          LOG.warn("File slices in file system belong to missing commits: {}", 
String.join(",", missingCommits));
+          log.warn("File slices in file system belong to missing commits: {}", 
String.join(",", missingCommits));
           
activeTimeline.getRollbackTimeline().getInstantsAsStream().forEach(instant -> {
             HoodieInstant requestedInstant = 
metaClient.getInstantGenerator().getRollbackRequestedInstant(instant);
             try {
               HoodieRollbackPlan rollbackPlan = 
activeTimeline.readInstantContent(requestedInstant, HoodieRollbackPlan.class);
               if 
(missingCommits.contains(rollbackPlan.getInstantToRollback().getCommitTime())) {
-                LOG.warn("Missing commit ({}) is part of rollback plan: {}", 
rollbackPlan.getInstantToRollback().getCommitTime(), rollbackPlan);
+                log.warn("Missing commit ({}) is part of rollback plan: {}", 
rollbackPlan.getInstantToRollback().getCommitTime(), rollbackPlan);
               }
             } catch (IOException ex) {
-              LOG.warn("Failed to deserialize rollback plan for instant: {}", 
requestedInstant, ex);
+              log.warn("Failed to deserialize rollback plan for instant: {}", 
requestedInstant, ex);
             }
           });
         }
@@ -1663,7 +1656,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       try {
         HoodieSchema readerSchema = 
TableSchemaResolver.readSchemaFromLogFile(storage, new 
StoragePath(logFilePathStr));
         if (readerSchema == null) {
-          LOG.warn("Cannot read schema from log file {}. Skip the check as 
it's likely being written by an inflight instant.", logFilePathStr);
+          log.warn("Cannot read schema from log file {}. Skip the check as 
it's likely being written by an inflight instant.", logFilePathStr);
           continue;
         }
         reader =
@@ -1700,7 +1693,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
                   "Log file is committed in an instant in active timeline: 
instantTime=%s %s",
                   instantTime, logFilePathStr));
             } else {
-              LOG.warn("Log file is uncommitted in a completed instant, likely 
due to retry: instantTime={} {}", instantTime, logFilePathStr);
+              log.warn("Log file is uncommitted in a completed instant, likely 
due to retry: instantTime={} {}", instantTime, logFilePathStr);
             }
           } else if 
(completedInstantsTimeline.isBeforeTimelineStarts(instantTime)) {
             // The instant is in archived timeline
@@ -1710,18 +1703,18 @@ public class HoodieMetadataTableValidator implements 
Serializable {
           } else if (inflightInstantsTimeline.containsInstant(instantTime)) {
             // The instant is inflight in active timeline
             // hit an uncommitted block possibly from a failed write
-            LOG.warn("Log file is uncommitted because of an inflight instant: 
instantTime={} {}", instantTime, logFilePathStr);
+            log.warn("Log file is uncommitted because of an inflight instant: 
instantTime={} {}", instantTime, logFilePathStr);
           } else {
             // The instant is after the start of the active timeline,
             // but it cannot be found in the active timeline
-            LOG.warn("Log file is uncommitted because the instant is after the 
start of the active timeline but absent or in requested in the active timeline: 
instantTime={} {}",
+            log.warn("Log file is uncommitted because the instant is after the 
start of the active timeline but absent or in requested in the active timeline: 
instantTime={} {}",
                 instantTime, logFilePathStr);
           }
         } else {
-          LOG.warn("There is no log block in {}", logFilePathStr);
+          log.warn("There is no log block in {}", logFilePathStr);
         }
       } catch (IOException e) {
-        LOG.warn("Cannot read log file {}. Skip the check as it's likely being 
written by an inflight instant.",
+        log.warn("Cannot read log file {}. Skip the check as it's likely being 
written by an inflight instant.",
             logFilePathStr, e);
       } finally {
         FileIOUtils.closeQuietly(reader);
@@ -1756,11 +1749,11 @@ public class HoodieMetadataTableValidator implements 
Serializable {
             long toSleepMs = cfg.minValidateIntervalSeconds * 1000 - 
(System.currentTimeMillis() - start);
 
             if (toSleepMs > 0) {
-              LOG.info("Last validate ran less than min validate interval: {} 
s, sleep: {} ms.", cfg.minValidateIntervalSeconds, toSleepMs);
+              log.info("Last validate ran less than min validate interval: {} 
s, sleep: {} ms.", cfg.minValidateIntervalSeconds, toSleepMs);
               Thread.sleep(toSleepMs);
             }
           } catch (HoodieValidationException e) {
-            LOG.error("Shutting down AsyncMetadataTableValidateService due to 
HoodieValidationException", e);
+            log.error("Shutting down AsyncMetadataTableValidateService due to 
HoodieValidationException", e);
             if (!cfg.ignoreFailed) {
               throw e;
             }
@@ -1816,15 +1809,18 @@ public class HoodieMetadataTableValidator implements 
Serializable {
    * the same information regardless of whether metadata table is enabled, 
which is
    * verified in the {@link HoodieMetadataTableValidator}.
    */
+  @Slf4j
   private static class HoodieMetadataValidationContext implements 
AutoCloseable, Serializable {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMetadataValidationContext.class);
-
     private final Properties props;
+    @Getter
     private final HoodieTableMetaClient metaClient;
+    @Getter
     private final HoodieMetadataConfig metadataConfig;
+    @Getter
     private final HoodieSchema schema;
     private final HoodieTableFileSystemView fileSystemView;
+    @Getter
     private final HoodieTableMetadata tableMetadata;
     private final boolean enableMetadataTable;
     private List<String> allColumnNameList;
@@ -1865,10 +1861,10 @@ public class HoodieMetadataTableValidator implements 
Serializable {
                                                         
FileSystemViewStorageConfig viewConf, HoodieCommonConfig commonConfig) {
       switch (viewConf.getStorageType()) {
         case SPILLABLE_DISK:
-          LOG.debug("Creating Spillable Disk based Table View");
+          log.debug("Creating Spillable Disk based Table View");
           break;
         case MEMORY:
-          LOG.debug("Creating in-memory based Table View");
+          log.debug("Creating in-memory based Table View");
           break;
         default:
           throw new HoodieException("Unsupported storage type " + 
viewConf.getStorageType() + ", used with HoodieMetadataTableValidator");
@@ -1876,22 +1872,6 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       return (HoodieTableFileSystemView) 
FileSystemViewManager.createViewManager(context, metadataConfig, viewConf, 
commonConfig, unused -> tableMetadata).getFileSystemView(metaClient);
     }
 
-    public HoodieTableMetaClient getMetaClient() {
-      return metaClient;
-    }
-
-    public HoodieMetadataConfig getMetadataConfig() {
-      return metadataConfig;
-    }
-
-    public HoodieSchema getSchema() {
-      return schema;
-    }
-
-    public HoodieTableMetadata getTableMetadata() {
-      return tableMetadata;
-    }
-
     public List<HoodieBaseFile> getSortedLatestBaseFileList(String 
partitionPath) {
       return fileSystemView.getLatestBaseFiles(partitionPath)
           .sorted(new HoodieBaseFileComparator()).collect(Collectors.toList());
@@ -1909,7 +1889,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
 
     @SuppressWarnings({"rawtypes", "unchecked"})
     public List<HoodieColumnRangeMetadata<Comparable>> 
getSortedColumnStatsList(String partitionPath, List<String> fileNames, 
HoodieSchema readerSchema) {
-      LOG.info("All column names for getting column stats: {}", 
allColumnNameList);
+      log.info("All column names for getting column stats: {}", 
allColumnNameList);
       if (enableMetadataTable) {
         List<Pair<String, String>> partitionFileNameList = fileNames.stream()
             .map(filename -> Pair.of(partitionPath, 
filename)).collect(Collectors.toList());
@@ -1993,11 +1973,11 @@ public class HoodieMetadataTableValidator implements 
Serializable {
           .getFileReader(new HoodieConfig(), path)) {
         bloomFilter = fileReader.readBloomFilter();
         if (bloomFilter == null) {
-          LOG.error("Failed to read bloom filter for {}", path);
+          log.error("Failed to read bloom filter for {}", path);
           return Option.empty();
         }
       } catch (IOException e) {
-        LOG.error("Failed to get file reader for {} {}", path, e);
+        log.error("Failed to get file reader for {} {}", path, e);
         return Option.empty();
       }
       return Option.of(BloomFilterData.builder()
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
index 11e4514d7d34..a3781999828f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
@@ -41,11 +41,10 @@ import org.apache.hudi.table.repair.RepairUtils;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -139,9 +138,9 @@ import java.util.stream.Collectors;
  * --backup-path backup_path
  * ```
  */
+@Slf4j
 public class HoodieRepairTool {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieRepairTool.class);
   private static final String BACKUP_DIR_PREFIX = "hoodie_repair_backup_";
   // Repair config
   private final Config cfg;
@@ -175,41 +174,39 @@ public class HoodieRepairTool {
     Option<String> endingInstantOption = 
Option.ofNullable(cfg.endingInstantTime);
 
     if (startingInstantOption.isPresent() && endingInstantOption.isPresent()) {
-      LOG.info(String.format("Start repairing completed instants between %s 
and %s (inclusive)",
-          startingInstantOption.get(), endingInstantOption.get()));
+      log.info("Start repairing completed instants between {} and {} 
(inclusive)",
+          startingInstantOption.get(), endingInstantOption.get());
     } else if (startingInstantOption.isPresent()) {
-      LOG.info(String.format("Start repairing completed instants from %s 
(inclusive)",
-          startingInstantOption.get()));
+      log.info("Start repairing completed instants from {} (inclusive)", 
startingInstantOption.get());
     } else if (endingInstantOption.isPresent()) {
-      LOG.info(String.format("Start repairing completed instants till %s 
(inclusive)",
-          endingInstantOption.get()));
+      log.info("Start repairing completed instants till {} (inclusive)", 
endingInstantOption.get());
     } else {
-      LOG.info("Start repairing all completed instants");
+      log.info("Start repairing all completed instants");
     }
 
     try {
       Mode mode = Mode.valueOf(cfg.runningMode.toUpperCase());
       switch (mode) {
         case REPAIR:
-          LOG.info(" ****** The repair tool is in REPAIR mode, dangling data 
and logs files "
+          log.info(" ****** The repair tool is in REPAIR mode, dangling data 
and logs files "
               + "not belonging to any commit are going to be DELETED from the 
table ******");
           if (checkBackupPathForRepair() < 0) {
-            LOG.error("Backup path check failed.");
+            log.error("Backup path check failed.");
             return false;
           }
           return doRepair(startingInstantOption, endingInstantOption, false);
         case DRY_RUN:
-          LOG.info(" ****** The repair tool is in DRY_RUN mode, "
+          log.info(" ****** The repair tool is in DRY_RUN mode, "
               + "only LOOKING FOR dangling data and log files from the table 
******");
           return doRepair(startingInstantOption, endingInstantOption, true);
         case UNDO:
           if (checkBackupPathAgainstBasePath() < 0) {
-            LOG.error("Backup path check failed.");
+            log.error("Backup path check failed.");
             return false;
           }
           return undoRepair();
         default:
-          LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit 
the job directly");
+          log.info("Unsupported running mode [{}], quit the job directly", 
cfg.runningMode);
           return false;
       }
     } catch (IOException e) {
@@ -229,7 +226,7 @@ public class HoodieRepairTool {
     try {
       new HoodieRepairTool(jsc, cfg).run();
     } catch (Throwable throwable) {
-      LOG.error("Fail to run table repair for " + cfg.basePath, throwable);
+      log.error("Fail to run table repair for {}", cfg.basePath, throwable);
     } finally {
       jsc.stop();
     }
@@ -264,8 +261,7 @@ public class HoodieRepairTool {
               }
             } catch (IOException e) {
               // Copy Fail
-              LOG.error(String.format("Copying file fails: source [%s], 
destination [%s]",
-                  sourcePath, destPath));
+              log.error("Copying file fails: source [{}], destination [{}]", 
sourcePath, destPath);
             } finally {
               results.add(success);
             }
@@ -321,7 +317,7 @@ public class HoodieRepairTool {
             try {
               success = fs.delete(new Path(basePath, relativeFilePath), false);
             } catch (IOException e) {
-              LOG.error("Failed to delete file {}", relativeFilePath);
+              log.error("Failed to delete file {}", relativeFilePath);
             } finally {
               results.add(success);
             }
@@ -378,12 +374,12 @@ public class HoodieRepairTool {
           .collect(Collectors.toList());
       if (relativeFilePathsToDelete.size() > 0) {
         if (!backupFiles(relativeFilePathsToDelete)) {
-          LOG.error("Error backing up dangling files. Exiting...");
+          log.error("Error backing up dangling files. Exiting...");
           return false;
         }
         return deleteFiles(context, cfg.basePath, relativeFilePathsToDelete);
       }
-      LOG.info(String.format("Table repair on %s is successful", 
cfg.basePath));
+      log.info("Table repair on {} is successful", cfg.basePath);
     }
     return true;
   }
@@ -398,14 +394,14 @@ public class HoodieRepairTool {
     String backupPathStr = cfg.backupPath;
     StoragePath backupPath = new StoragePath(backupPathStr);
     if (!storage.exists(backupPath)) {
-      LOG.error("Cannot find backup path: " + backupPath);
+      log.error("Cannot find backup path: {}", backupPath);
       return false;
     }
 
     List<String> allPartitionPaths = tableMetadata.getAllPartitionPaths();
 
     if (allPartitionPaths.isEmpty()) {
-      LOG.error("Cannot get one partition path since there is no partition 
available");
+      log.error("Cannot get one partition path since there is no partition 
available");
       return false;
     }
 
@@ -446,7 +442,7 @@ public class HoodieRepairTool {
     StoragePath backupPath = new StoragePath(cfg.backupPath);
     if (metaClient.getStorage().exists(backupPath)
         && metaClient.getStorage().listDirectEntries(backupPath).size() > 0) {
-      LOG.error(String.format("Cannot use backup path %s: it is not empty", 
cfg.backupPath));
+      log.error("Cannot use backup path {}: it is not empty", cfg.backupPath);
       return -1;
     }
 
@@ -461,13 +457,12 @@ public class HoodieRepairTool {
    */
   int checkBackupPathAgainstBasePath() {
     if (cfg.backupPath == null) {
-      LOG.error("Backup path is not configured");
+      log.error("Backup path is not configured");
       return -1;
     }
 
     if (cfg.backupPath.contains(cfg.basePath)) {
-      LOG.error(String.format("Cannot use backup path %s: it resides in the 
base path %s",
-          cfg.backupPath, cfg.basePath));
+      log.error("Cannot use backup path {}: it resides in the base path {}", 
cfg.backupPath, cfg.basePath);
       return -1;
     }
     return 0;
@@ -502,11 +497,11 @@ public class HoodieRepairTool {
   private void printRepairInfo(
       List<String> instantTimesToRepair, List<ImmutablePair<String, 
List<String>>> instantsWithDanglingFiles) {
     int numInstantsToRepair = instantsWithDanglingFiles.size();
-    LOG.info("Number of instants verified based on the base and log files: 
{}", instantTimesToRepair.size());
-    LOG.info("Instant timestamps: {}", instantTimesToRepair);
-    LOG.info("Number of instants to repair: {}", numInstantsToRepair);
+    log.info("Number of instants verified based on the base and log files: 
{}", instantTimesToRepair.size());
+    log.info("Instant timestamps: {}", instantTimesToRepair);
+    log.info("Number of instants to repair: {}", numInstantsToRepair);
     if (numInstantsToRepair > 0) {
-      instantsWithDanglingFiles.forEach(e -> LOG.info("   ** Removing files: 
{}", e.getValue()));
+      instantsWithDanglingFiles.forEach(e -> log.info("   ** Removing files: 
{}", e.getValue()));
     }
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
index 38794c5345fd..78e72ee31c3b 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
@@ -49,6 +49,7 @@ import com.beust.jcommander.IValueValidator;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -62,8 +63,6 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -79,6 +78,7 @@ import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTim
 /**
  * Export the latest records of Hudi dataset to a set of external files (e.g., 
plain parquet files).
  */
+@Slf4j
 public class HoodieSnapshotExporter {
 
   @FunctionalInterface
@@ -88,8 +88,6 @@ public class HoodieSnapshotExporter {
 
   }
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieSnapshotExporter.class);
-
   public static class OutputFormatValidator implements IValueValidator<String> 
{
 
     public static final String HUDI = "hudi";
@@ -159,8 +157,7 @@ public class HoodieSnapshotExporter {
         .<HoodieSnapshotExporterException>orElseThrow(() -> {
           throw new HoodieSnapshotExporterException("No commits present. 
Nothing to snapshot.");
         });
-    LOG.info(String.format("Starting to snapshot latest version files which 
are also no-late-than %s.",
-        latestCommitTimestamp));
+    log.info("Starting to snapshot latest version files which are also 
no-late-than {}.", latestCommitTimestamp);
 
     final HoodieSparkEngineContext engineContext = new 
HoodieSparkEngineContext(jsc);
     final List<String> partitions = getPartitions(engineContext, cfg, 
HoodieStorageUtils.getStorage(
@@ -169,7 +166,7 @@ public class HoodieSnapshotExporter {
     if (partitions.isEmpty()) {
       throw new HoodieSnapshotExporterException("The source dataset has 0 
partition to snapshot.");
     }
-    LOG.info(String.format("The job needs to export %d partitions.", 
partitions.size()));
+    log.info("The job needs to export {} partitions.", partitions.size());
 
     if (cfg.outputFormat.equals(OutputFormatValidator.HUDI)) {
       exportAsHudi(jsc, sourceFs, cfg, partitions, latestCommitTimestamp, 
tableMetadata);
@@ -194,7 +191,7 @@ public class HoodieSnapshotExporter {
   private void createSuccessTag(FileSystem fs, Config cfg) throws IOException {
     Path successTagPath = new Path(cfg.targetOutputPath + "/_SUCCESS");
     if (!fs.exists(successTagPath)) {
-      LOG.info(String.format("Creating _SUCCESS under target output path: %s", 
cfg.targetOutputPath));
+      log.info("Creating _SUCCESS under target output path: {}", 
cfg.targetOutputPath);
       fs.createNewFile(successTagPath);
     }
   }
@@ -285,7 +282,7 @@ public class HoodieSnapshotExporter {
     }, parallelism);
 
     // Also copy the .commit files
-    LOG.info(String.format("Copying .commit files which are no-late-than %s.", 
latestCommitTimestamp));
+    log.info("Copying .commit files which are no-late-than {}.", 
latestCommitTimestamp);
     List<FileStatus> commitFilesListToCopy =
         Arrays.stream(sourceFs.listStatus(new Path(cfg.sourceBasePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/" + 
HoodieTableMetaClient.TIMELINEFOLDER_NAME)))
             .filter(fileStatus -> {
@@ -344,7 +341,7 @@ public class HoodieSnapshotExporter {
     }
 
     JavaSparkContext jsc = 
UtilHelpers.buildSparkContext("Hoodie-snapshot-exporter", "local[*]", 
cfg.enableHiveSupport);
-    LOG.info("Initializing spark job.");
+    log.info("Initializing spark job.");
 
     try {
       new HoodieSnapshotExporter().export(jsc, cfg);
@@ -359,13 +356,13 @@ public class HoodieSnapshotExporter {
       switch (config.transformerClassName) {
         case "org.apache.hudi.utilities.transform.SqlQueryBasedTransformer":
           if (StringUtils.isNullOrEmpty(config.transformerSql)) {
-            LOG.error("--transformer-sql is required when using 
SqlQueryBasedTransformer");
+            log.error("--transformer-sql is required when using 
SqlQueryBasedTransformer");
             valid = false;
           }
           break;
         case "org.apache.hudi.utilities.transform.SqlFileBasedTransformer":
           if (StringUtils.isNullOrEmpty(config.transformerSqlFile)) {
-            LOG.error("--transformer-sql-file is required when using 
SqlFileBasedTransformer");
+            log.error("--transformer-sql-file is required when using 
SqlFileBasedTransformer");
             valid = false;
           }
           break;
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieTTLJob.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieTTLJob.java
index 519ce482d6ec..3d6144976028 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieTTLJob.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieTTLJob.java
@@ -31,10 +31,9 @@ import org.apache.hudi.exception.HoodieException;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -43,9 +42,9 @@ import java.util.List;
 /**
  * Utility class to run TTL management.
  */
+@Slf4j
 public class HoodieTTLJob {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieTTLJob.class);
   private final Config cfg;
   private final TypedProperties props;
   private final JavaSparkContext jsc;
@@ -61,7 +60,7 @@ public class HoodieTTLJob {
     this.jsc = jsc;
     this.props = props;
     this.metaClient = metaClient;
-    LOG.info("Creating TTL job with configs : " + props.toString());
+    log.info("Creating TTL job with configs : {}", props.toString());
     // Disable async cleaning, will trigger synchronous cleaning manually.
     this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), false);
     if (this.metaClient.getTableConfig().isMetadataTableAvailable()) {
@@ -129,7 +128,7 @@ public class HoodieTTLJob {
       SparkAdapterSupport$.MODULE$.sparkAdapter().stopSparkContext(jssc, 
exitCode);
     }
 
-    LOG.info("Hoodie TTL job ran successfully");
+    log.info("Hoodie TTL job ran successfully");
   }
 
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java
index 524c4f549d2b..d1e25849c4b9 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java
@@ -41,12 +41,11 @@ import com.beust.jcommander.Parameter;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Snapshot;
 import com.codahale.metrics.UniformReservoir;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -97,10 +96,10 @@ import java.util.stream.Collectors;
  * --base-path <base-path> \
  * --num-days <number-of-days>
  */
+@Slf4j
 public class TableSizeStats implements Serializable {
 
   private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LoggerFactory.getLogger(TableSizeStats.class);
 
   // Date formatter for parsing partition dates (example: 2023/5/5/ or 
2023-5-5).
   private static final DateTimeFormatter DATE_FORMATTER =
@@ -138,6 +137,7 @@ public class TableSizeStats implements Serializable {
   }
 
   public static class Config implements Serializable {
+
     @Parameter(names = {"--base-path", "-bp"}, description = "Base path for 
the table", required = false)
     public String basePath = null;
 
@@ -241,9 +241,9 @@ public class TableSizeStats implements Serializable {
       TableSizeStats tableSizeStats = new TableSizeStats(jsc, cfg);
       tableSizeStats.run();
     } catch (TableNotFoundException e) {
-      LOG.warn("The Hudi data table is not found: [{}].", cfg.basePath, e);
+      log.warn("The Hudi data table is not found: [{}].", cfg.basePath, e);
     } catch (Throwable throwable) {
-      LOG.error("Failed to get table size stats for {}", cfg, throwable);
+      log.error("Failed to get table size stats for {}", cfg, throwable);
     } finally {
       jsc.stop();
     }
@@ -251,8 +251,8 @@ public class TableSizeStats implements Serializable {
 
   public void run() {
     try {
-      LOG.info(cfg.toString());
-      LOG.info(" ****** Fetching table size stats ******");
+      log.info(cfg.toString());
+      log.info(" ****** Fetching table size stats ******");
 
       // Determine starting and ending date intervals for filtering data files.
       LocalDate[] dateInterval = getUserSpecifiedDateInterval(cfg);
@@ -276,7 +276,7 @@ public class TableSizeStats implements Serializable {
 
   private void logTableStats(String basePath, LocalDate[] dateInterval) throws 
IOException {
 
-    LOG.info("Processing table {}", basePath);
+    log.info("Processing table {}", basePath);
     HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
         .enable(isMetadataEnabled(basePath, jsc))
         .build();
@@ -351,7 +351,7 @@ public class TableSizeStats implements Serializable {
       logStats("Table stats [path: " + basePath + "]", tableHistogram);
     } else {
       // Display only total talbe size
-      LOG.info("Total size: {}", 
getFileSizeUnit(Arrays.stream(tableHistogram.getSnapshot().getValues()).sum()));
+      log.info("Total size: {}", 
getFileSizeUnit(Arrays.stream(tableHistogram.getSnapshot().getValues()).sum()));
     }
   }
 
@@ -378,7 +378,7 @@ public class TableSizeStats implements Serializable {
         line = reader.readLine();
       }
     } catch (IOException ioe) {
-      LOG.error("Error reading in properties from dfs from file." + propsPath);
+      log.error("Error reading in properties from dfs from file. {}", 
propsPath);
       throw new HoodieIOException("Cannot read properties from dfs from file " 
+ propsPath, ioe);
     }
     return filePaths;
@@ -390,12 +390,12 @@ public class TableSizeStats implements Serializable {
     if (cfg.endDate != null) {
       try {
         endDate = LocalDate.parse(cfg.endDate, DATE_FORMATTER);
-        LOG.info("Setting ending date to {}. ", endDate);
+        log.info("Setting ending date to {}.", endDate);
       } catch (DateTimeParseException dtpe) {
         throw new HoodieException("Unable to parse --end-date. ", dtpe);
       }
     } else {
-      LOG.info("End date is not specified: {}.", endDate);
+      log.info("End date is not specified: {}.", endDate);
     }
 
     // Set startDate to null by default.
@@ -404,14 +404,14 @@ public class TableSizeStats implements Serializable {
     // Set startDate to cfg.startDate if specified. cfg.startDate takes 
priority over cfg.numDays if both are specified.
     if (cfg.startDate != null) {
       startDate = LocalDate.parse(cfg.startDate, DATE_FORMATTER);
-      LOG.info("Setting starting date to {}.", startDate);
+      log.info("Setting starting date to {}.", startDate);
     } else {
       if (cfg.numDays == 0) {
-        LOG.info("Start date not specified: {}.", startDate);
+        log.info("Start date not specified: {}.", startDate);
       } else if (cfg.numDays > 0) {
         endDate = LocalDate.now();
         startDate = endDate.minusDays(cfg.numDays);
-        LOG.info("Setting starting date to {} ({} - {} days). ", startDate, 
endDate, cfg.numDays);
+        log.info("Setting starting date to {} ({} - {} days). ", startDate, 
endDate, cfg.numDays);
       } else {
         throw new HoodieException("--num-days must specify a positive value.");
       }
@@ -422,7 +422,7 @@ public class TableSizeStats implements Serializable {
       throw new HoodieException("Starting date must be before ending date. 
Start Date: " + startDate + ", End Date: " + endDate);
     }
 
-    return startDate == null && endDate == null ? null : new 
LocalDate[]{startDate, endDate};
+    return startDate == null && endDate == null ? null : new LocalDate[] 
{startDate, endDate};
   }
 
   @Nullable
@@ -442,7 +442,7 @@ public class TableSizeStats implements Serializable {
     try {
       return LocalDate.parse(dateString, DATE_FORMATTER);
     } catch (DateTimeParseException dtpe) {
-      LOG.error("Partition name {} must conform to date format if 
--start-date, --end-date, or --num-days are specified. ", partition, dtpe);
+      log.error("Partition name {} must conform to date format if 
--start-date, --end-date, or --num-days are specified. ", partition, dtpe);
     }
     return partitionDate;
   }
@@ -458,17 +458,17 @@ public class TableSizeStats implements Serializable {
   }
 
   private static void logStats(String header, Histogram histogram) {
-    LOG.info(header);
+    log.info(header);
     Snapshot snapshot = histogram.getSnapshot();
-    LOG.info("Number of files: {}", snapshot.size());
-    LOG.info("Total size: {}", 
getFileSizeUnit(Arrays.stream(snapshot.getValues()).sum()));
-    LOG.info("Minimum file size: {}", getFileSizeUnit(snapshot.getMin()));
-    LOG.info("Maximum file size: {}", getFileSizeUnit(snapshot.getMax()));
-    LOG.info("Average file size: {}", getFileSizeUnit(snapshot.getMean()));
-    LOG.info("Median file size: {}", getFileSizeUnit(snapshot.getMedian()));
-    LOG.info("P50 file size: {}", getFileSizeUnit(snapshot.getValue(0.5)));
-    LOG.info("P90 file size: {}", getFileSizeUnit(snapshot.getValue(0.9)));
-    LOG.info("P95 file size: {}", getFileSizeUnit(snapshot.getValue(0.95)));
-    LOG.info("P99 file size: {}", getFileSizeUnit(snapshot.getValue(0.99)));
+    log.info("Number of files: {}", snapshot.size());
+    log.info("Total size: {}", 
getFileSizeUnit(Arrays.stream(snapshot.getValues()).sum()));
+    log.info("Minimum file size: {}", getFileSizeUnit(snapshot.getMin()));
+    log.info("Maximum file size: {}", getFileSizeUnit(snapshot.getMax()));
+    log.info("Average file size: {}", getFileSizeUnit(snapshot.getMean()));
+    log.info("Median file size: {}", getFileSizeUnit(snapshot.getMedian()));
+    log.info("P50 file size: {}", getFileSizeUnit(snapshot.getValue(0.5)));
+    log.info("P90 file size: {}", getFileSizeUnit(snapshot.getValue(0.9)));
+    log.info("P95 file size: {}", getFileSizeUnit(snapshot.getValue(0.95)));
+    log.info("P99 file size: {}", getFileSizeUnit(snapshot.getValue(0.99)));
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 9d04641f71b4..22bf3f4b4dcb 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -76,6 +76,7 @@ import org.apache.hudi.utilities.transform.ChainedTransformer;
 import org.apache.hudi.utilities.transform.ErrorTableAwareChainedTransformer;
 import org.apache.hudi.utilities.transform.Transformer;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -93,8 +94,6 @@ import org.apache.spark.sql.jdbc.JdbcDialect;
 import org.apache.spark.sql.jdbc.JdbcDialects;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.util.LongAccumulator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -126,6 +125,7 @@ import static 
org.apache.hudi.utilities.config.HoodieStreamerConfig.SCHEMA_MAKE_
 /**
  * Bunch of helper methods.
  */
+@Slf4j
 public class UtilHelpers {
 
   public static final String EXECUTE = "execute";
@@ -133,8 +133,6 @@ public class UtilHelpers {
   public static final String SCHEDULE_AND_EXECUTE = "scheduleandexecute";
   public static final String PURGE_PENDING_INSTANT = "purge_pending_instant";
 
-  private static final Logger LOG = LoggerFactory.getLogger(UtilHelpers.class);
-
   public static HoodieRecordMerger createRecordMerger(Properties props) {
     return HoodieRecordUtils.createRecordMerger(null, EngineType.SPARK,
         StringUtils.split(ConfigUtils.getStringWithAltKeys(props, 
HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES, null), ","),
@@ -142,7 +140,7 @@ public class UtilHelpers {
   }
 
   public static Source createSource(String sourceClass, TypedProperties cfg, 
JavaSparkContext jssc,
-                                    SparkSession sparkSession, 
HoodieIngestionMetrics metrics, StreamContext streamContext) throws IOException 
{
+      SparkSession sparkSession, HoodieIngestionMetrics metrics, StreamContext 
streamContext) throws IOException {
     // All possible constructors.
     Class<?>[] constructorArgsStreamContextMetrics = new Class<?>[] 
{TypedProperties.class, JavaSparkContext.class, SparkSession.class, 
HoodieIngestionMetrics.class, StreamContext.class};
     Class<?>[] constructorArgsStreamContext = new Class<?>[] 
{TypedProperties.class, JavaSparkContext.class, SparkSession.class, 
StreamContext.class};
@@ -168,7 +166,7 @@ public class UtilHelpers {
         String constructorSignature = Arrays.stream(constructor.getLeft())
             .map(Class::getSimpleName)
             .collect(Collectors.joining(", ", "[", "]"));
-        LOG.error("Unexpected error while loading source class {} with 
constructor signature {}", sourceClass, constructorSignature, e);
+        log.error("Unexpected error while loading source class {} with 
constructor signature {}", sourceClass, constructorSignature, e);
       } catch (Throwable t) {
         throw new IOException("Could not load source class due to unexpected 
error " + sourceClass, t);
       }
@@ -197,7 +195,7 @@ public class UtilHelpers {
   }
 
   public static SchemaProvider createSchemaProvider(String 
schemaProviderClass, TypedProperties cfg,
-                                                    JavaSparkContext jssc) 
throws IOException {
+      JavaSparkContext jssc) throws IOException {
     try {
       return StringUtils.isNullOrEmpty(schemaProviderClass) ? null
           : (SchemaProvider) ReflectionUtils.loadClass(schemaProviderClass, 
cfg, jssc);
@@ -233,7 +231,7 @@ public class UtilHelpers {
   }
 
   public static Option<Transformer> createTransformer(Option<List<String>> 
classNamesOpt, Supplier<Option<HoodieSchema>> sourceSchemaSupplier,
-                                                      boolean 
isErrorTableWriterEnabled) throws IOException {
+      boolean isErrorTableWriterEnabled) throws IOException {
 
     try {
       Function<List<String>, Transformer> chainedTransformerFunction = 
classNames ->
@@ -255,13 +253,13 @@ public class UtilHelpers {
   }
 
   public static DFSPropertiesConfiguration readConfig(Configuration 
hadoopConfig,
-                                                      Path cfgPath,
-                                                      List<String> 
overriddenProps) {
+      Path cfgPath,
+      List<String> overriddenProps) {
     StoragePath storagePath = convertToStoragePath(cfgPath);
     DFSPropertiesConfiguration conf = new 
DFSPropertiesConfiguration(hadoopConfig, storagePath);
     try {
       if (!overriddenProps.isEmpty()) {
-        LOG.info("Adding overridden properties to file properties.");
+        log.info("Adding overridden properties to file properties.");
         conf.addPropsFromStream(new BufferedReader(new 
StringReader(String.join("\n", overriddenProps))), storagePath);
       }
     } catch (IOException ioe) {
@@ -275,7 +273,7 @@ public class UtilHelpers {
     DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration();
     try {
       if (!overriddenProps.isEmpty()) {
-        LOG.info("Adding overridden properties to file properties.");
+        log.info("Adding overridden properties to file properties.");
         conf.addPropsFromStream(new BufferedReader(new 
StringReader(String.join("\n", overriddenProps))), null);
       }
     } catch (IOException ioe) {
@@ -289,7 +287,7 @@ public class UtilHelpers {
     return StringUtils.isNullOrEmpty(propsFilePath)
         ? UtilHelpers.buildProperties(props)
         : UtilHelpers.readConfig(hadoopConf, new Path(propsFilePath), props)
-        .getProps(true);
+            .getProps(true);
   }
 
   public static TypedProperties buildProperties(List<String> props) {
@@ -310,7 +308,7 @@ public class UtilHelpers {
   /**
    * Parse Schema from file.
    *
-   * @param fs File System
+   * @param fs         File System
    * @param schemaFile Schema File
    */
   public static String parseSchema(FileSystem fs, String schemaFile) throws 
Exception {
@@ -412,9 +410,9 @@ public class UtilHelpers {
   /**
    * Build Hoodie write client.
    *
-   * @param jsc Java Spark Context
-   * @param basePath Base Path
-   * @param schemaStr Schema
+   * @param jsc         Java Spark Context
+   * @param basePath    Base Path
+   * @param schemaStr   Schema
    * @param parallelism Parallelism
    */
   public static SparkRDDWriteClient<HoodieRecordPayload> 
createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr,
@@ -440,14 +438,14 @@ public class UtilHelpers {
     writeResponse.foreach(writeStatus -> {
       if (writeStatus.hasErrors()) {
         errors.add(1);
-        LOG.error("Error processing records :writeStatus:{}", 
writeStatus.getStat().toString());
+        log.error("Error processing records :writeStatus:{}", 
writeStatus.getStat().toString());
       }
     });
     if (errors.value() == 0) {
-      LOG.info("Table imported into hoodie with {} instant time.", 
instantTime);
+      log.info("Table imported into hoodie with {} instant time.", 
instantTime);
       return 0;
     }
-    LOG.error("Import failed with {} errors.", errors.value());
+    log.error("Import failed with {} errors.", errors.value());
     return -1;
   }
 
@@ -455,11 +453,11 @@ public class UtilHelpers {
     List<HoodieWriteStat> writeStats = metadata.getWriteStats();
     long errorsCount = 
writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum();
     if (errorsCount == 0) {
-      LOG.info("Finish job with {} instant time.", instantTime);
+      log.info("Finish job with {} instant time.", instantTime);
       return 0;
     }
 
-    LOG.error("Job failed with {} errors.", errorsCount);
+    log.error("Job failed with {} errors.", errorsCount);
     return -1;
   }
 
@@ -571,7 +569,7 @@ public class UtilHelpers {
   }
 
   public static SchemaProvider 
wrapSchemaProviderWithPostProcessor(SchemaProvider provider,
-                                                                               
     TypedProperties cfg, JavaSparkContext jssc, List<String> 
transformerClassNames) {
+      TypedProperties cfg, JavaSparkContext jssc, List<String> 
transformerClassNames) {
 
     if (provider == null) {
       return null;
@@ -601,16 +599,16 @@ public class UtilHelpers {
   }
 
   public static SchemaProvider createRowBasedSchemaProvider(StructType 
structType,
-                                                            TypedProperties 
cfg,
-                                                            JavaSparkContext 
jssc) {
+      TypedProperties cfg,
+      JavaSparkContext jssc) {
     SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType);
     return wrapSchemaProviderWithPostProcessor(rowSchemaProvider, cfg, jssc, 
null);
   }
 
   public static Option<HoodieSchema> getLatestTableSchema(JavaSparkContext 
jssc,
-                                                          HoodieStorage 
storage,
-                                                          String basePath,
-                                                          
HoodieTableMetaClient tableMetaClient) {
+      HoodieStorage storage,
+      String basePath,
+      HoodieTableMetaClient tableMetaClient) {
     try {
       if (FSUtils.isTableExists(basePath, storage)) {
         TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(tableMetaClient);
@@ -618,7 +616,7 @@ public class UtilHelpers {
         return tableSchemaResolver.getTableSchemaFromLatestCommit(false);
       }
     } catch (Exception e) {
-      LOG.warn("Failed to fetch latest table's schema", e);
+      log.warn("Failed to fetch latest table's schema", e);
     }
 
     return Option.empty();
@@ -655,6 +653,7 @@ public class UtilHelpers {
 
   @FunctionalInterface
   public interface CheckedSupplier<T> {
+
     T get() throws Throwable;
   }
 
@@ -665,7 +664,7 @@ public class UtilHelpers {
         ret = supplier.get();
       } while (ret != 0 && maxRetryCount-- > 0);
     } catch (Throwable t) {
-      LOG.error(errorMessage, t);
+      log.error(errorMessage, t);
       throw new RuntimeException("Failed in retry", t);
     }
     return ret;
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/kafka/HoodieWriteCommitKafkaCallback.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/kafka/HoodieWriteCommitKafkaCallback.java
index df466a0a7dd6..382fe27b5d1d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/kafka/HoodieWriteCommitKafkaCallback.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/kafka/HoodieWriteCommitKafkaCallback.java
@@ -25,13 +25,12 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Properties;
 
@@ -45,10 +44,9 @@ import static 
org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCal
 /**
  * Kafka implementation of {@link HoodieWriteCommitCallback}.
  */
+@Slf4j
 public class HoodieWriteCommitKafkaCallback implements 
HoodieWriteCommitCallback {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieWriteCommitKafkaCallback.class);
-
   private final HoodieConfig hoodieConfig;
   private final String bootstrapServers;
   private final String topic;
@@ -66,9 +64,9 @@ public class HoodieWriteCommitKafkaCallback implements 
HoodieWriteCommitCallback
     try (KafkaProducer<String, String> producer = 
createProducer(hoodieConfig)) {
       ProducerRecord<String, String> record = 
buildProducerRecord(hoodieConfig, callbackMsg);
       producer.send(record).get();
-      LOG.info("Send callback message succeed");
+      log.info("Send callback message succeed");
     } catch (Exception e) {
-      LOG.error("Send kafka callback msg failed : ", e);
+      log.error("Send kafka callback msg failed : ", e);
     }
   }
 
@@ -94,8 +92,8 @@ public class HoodieWriteCommitKafkaCallback implements 
HoodieWriteCommitCallback
     
kafkaProducerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
         "org.apache.kafka.common.serialization.StringSerializer");
 
-    LOG.debug("Callback kafka producer init with configs: "
-        + 
HoodieWriteCommitCallbackUtil.convertToJsonString(kafkaProducerProps));
+    log.debug("Callback kafka producer init with configs: {}",
+        HoodieWriteCommitCallbackUtil.convertToJsonString(kafkaProducerProps));
     return new KafkaProducer<String, String>(kafkaProducerProps);
   }
 
@@ -138,11 +136,11 @@ public class HoodieWriteCommitKafkaCallback implements 
HoodieWriteCommitCallback
     @Override
     public void onCompletion(RecordMetadata metadata, Exception exception) {
       if (null != metadata) {
-        LOG.info("message offset={} partition={} timestamp={} topic={}",
+        log.info("message offset={} partition={} timestamp={} topic={}",
                 metadata.offset(), metadata.partition(), metadata.timestamp(), 
metadata.topic());
       }
       if (null != exception) {
-        LOG.error("Send kafka callback msg failed : ", exception);
+        log.error("Send kafka callback msg failed: ", exception);
       }
     }
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/pulsar/HoodieWriteCommitPulsarCallback.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/pulsar/HoodieWriteCommitPulsarCallback.java
index af4bbbf49e46..379aba2d19ec 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/pulsar/HoodieWriteCommitPulsarCallback.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/pulsar/HoodieWriteCommitPulsarCallback.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -32,8 +33,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -56,10 +55,9 @@ import static 
org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarC
 /**
  * Pulsar implementation of {@link HoodieWriteCommitCallback}.
  */
+@Slf4j
 public class HoodieWriteCommitPulsarCallback implements 
HoodieWriteCommitCallback, Closeable {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieWriteCommitPulsarCallback.class);
-
   private final String serviceUrl;
   private final String topic;
 
@@ -85,9 +83,9 @@ public class HoodieWriteCommitPulsarCallback implements 
HoodieWriteCommitCallbac
     String callbackMsg = 
HoodieWriteCommitCallbackUtil.convertToJsonString(callbackMessage);
     try {
       
producer.newMessage().key(callbackMessage.getTableName()).value(callbackMsg).send();
-      LOG.info("Send callback message succeed");
+      log.info("Send callback message succeed");
     } catch (Exception e) {
-      LOG.error("Send pulsar callback msg failed : ", e);
+      log.error("Send pulsar callback msg failed: ", e);
     }
   }
 
@@ -165,7 +163,7 @@ public class HoodieWriteCommitPulsarCallback implements 
HoodieWriteCommitCallbac
       try {
         producer.close();
       } catch (Throwable t) {
-        LOG.warn("Could not properly close the producer.", t);
+        log.warn("Could not properly close the producer.", t);
       }
     }
 
@@ -173,7 +171,7 @@ public class HoodieWriteCommitPulsarCallback implements 
HoodieWriteCommitCallbac
       try {
         client.close();
       } catch (Throwable t) {
-        LOG.warn("Could not properly close the client.", t);
+        log.warn("Could not properly close the client.", t);
       }
     }
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java
index 365a01b60409..a2ffb4878454 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java
@@ -23,6 +23,7 @@ import org.apache.hudi.exception.HoodieException;
 
 import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
 import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import lombok.NoArgsConstructor;
 import org.apache.avro.Schema;
 import org.apache.kafka.common.errors.SerializationException;
 
@@ -35,13 +36,11 @@ import static 
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_VALUE_DES
 /**
  * Extending {@link KafkaAvroSchemaDeserializer} as we need to be able to 
inject reader schema during deserialization.
  */
+@NoArgsConstructor
 public class KafkaAvroSchemaDeserializer extends KafkaAvroDeserializer {
 
   private Schema sourceSchema;
 
-  public KafkaAvroSchemaDeserializer() {
-  }
-
   public KafkaAvroSchemaDeserializer(SchemaRegistryClient client, Map<String, 
?> props) {
     super(client, props);
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionException.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionException.java
index 04174f740767..57f3e7910c8e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionException.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionException.java
@@ -20,14 +20,14 @@ package org.apache.hudi.utilities.ingestion;
 
 import org.apache.hudi.exception.HoodieException;
 
+import lombok.NoArgsConstructor;
+
 /**
  * The root exception class for any failure with {@link 
HoodieIngestionService}.
  */
+@NoArgsConstructor
 public class HoodieIngestionException extends HoodieException {
 
-  public HoodieIngestionException() {
-  }
-
   public HoodieIngestionException(String message) {
     super(message);
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionService.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionService.java
index 5f2c15f090c4..2769d851b107 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionService.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionService.java
@@ -27,8 +27,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.utilities.streamer.PostWriteTerminationStrategy;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -40,10 +39,9 @@ import static 
org.apache.hudi.utilities.ingestion.HoodieIngestionService.HoodieI
 /**
  * A generic service to facilitate running data ingestion.
  */
+@Slf4j
 public abstract class HoodieIngestionService extends HoodieAsyncService {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieIngestionService.class);
-
   protected HoodieIngestionConfig ingestionConfig;
 
   public HoodieIngestionService(HoodieIngestionConfig ingestionConfig) {
@@ -59,18 +57,18 @@ public abstract class HoodieIngestionService extends 
HoodieAsyncService {
    */
   public void startIngestion() {
     if (ingestionConfig.getBoolean(INGESTION_IS_CONTINUOUS)) {
-      LOG.info("Ingestion service starts running in continuous mode");
+      log.info("Ingestion service starts running in continuous mode");
       start(this::onIngestionCompletes);
       try {
         waitForShutdown();
       } catch (Exception e) {
         throw new HoodieIngestionException("Ingestion service was shut down 
with exception.", e);
       }
-      LOG.info("Ingestion service (continuous mode) has been shut down.");
+      log.info("Ingestion service (continuous mode) has been shut down.");
     } else {
-      LOG.info("Ingestion service starts running in run-once mode");
+      log.info("Ingestion service starts running in run-once mode");
       ingestOnce();
-      LOG.info("Ingestion service (run-once mode) has been shut down.");
+      log.info("Ingestion service (run-once mode) has been shut down.");
     }
   }
 
@@ -121,8 +119,8 @@ public abstract class HoodieIngestionService extends 
HoodieAsyncService {
       long minSyncInternalSeconds = 
ingestionConfig.getLongOrDefault(INGESTION_MIN_SYNC_INTERNAL_SECONDS);
       long sleepMs = minSyncInternalSeconds * 1000 - 
(System.currentTimeMillis() - ingestionStartEpochMillis);
       if (sleepMs > 0) {
-        LOG.info(String.format("Last ingestion took less than min sync 
interval: %d s; sleep for %.2f s",
-            minSyncInternalSeconds, sleepMs / 1000.0));
+        log.info("Last ingestion took less than min sync interval: {} s; sleep 
for {} s",
+            minSyncInternalSeconds, sleepMs / 1000.0);
         Thread.sleep(sleepMs);
       }
     } catch (InterruptedException e) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ArchiveTask.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ArchiveTask.java
index fe5decd7005c..03d67dd2d969 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ArchiveTask.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ArchiveTask.java
@@ -25,21 +25,20 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.utilities.UtilHelpers;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Archive task to run in TableServicePipeline.
  *
  * @see HoodieMultiTableServicesMain
  */
+@Slf4j
 class ArchiveTask extends TableServiceTask {
-  private static final Logger LOG = LoggerFactory.getLogger(ArchiveTask.class);
 
   @Override
   void run() {
-    LOG.info("Run Archive with props: " + props);
+    log.info("Run Archive with props: {}", props);
     HoodieWriteConfig hoodieCfg = 
HoodieWriteConfig.newBuilder().withPath(basePath).withProps(props).build();
     try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(new 
HoodieSparkEngineContext(jsc), hoodieCfg)) {
       UtilHelpers.retry(retry, () -> {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ClusteringTask.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ClusteringTask.java
index 66efbd475dc4..34d585b903d6 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ClusteringTask.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/ClusteringTask.java
@@ -23,6 +23,8 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.utilities.HoodieClusteringJob;
 
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
 import org.apache.spark.api.java.JavaSparkContext;
 
 /**
@@ -71,6 +73,7 @@ class ClusteringTask extends TableServiceTask {
   /**
    * Builder class for {@link ClusteringTask}.
    */
+  @NoArgsConstructor(access = AccessLevel.PRIVATE)
   public static final class Builder {
 
     /**
@@ -110,9 +113,6 @@ class ClusteringTask extends TableServiceTask {
      */
     private HoodieTableMetaClient metaClient;
 
-    private Builder() {
-    }
-
     public Builder withProps(TypedProperties props) {
       this.props = props;
       return this;
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/HoodieMultiTableServicesMain.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/HoodieMultiTableServicesMain.java
index aade3d0a4854..9d4af77d3f42 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/HoodieMultiTableServicesMain.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/HoodieMultiTableServicesMain.java
@@ -28,11 +28,10 @@ import org.apache.hudi.utilities.UtilHelpers;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -51,8 +50,9 @@ import java.util.stream.Collectors;
 /**
  * Main function for executing multi-table services.
  */
+@Slf4j
 public class HoodieMultiTableServicesMain {
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMultiTableServicesMain.class);
+
   final Config cfg;
   final TypedProperties props;
 
@@ -107,7 +107,7 @@ public class HoodieMultiTableServicesMain {
   }
 
   public void startServices() throws ExecutionException, InterruptedException {
-    LOG.info("StartServices Config: " + cfg);
+    log.info("StartServices Config: {}", cfg);
     List<String> tablePaths;
     if (cfg.autoDiscovery) {
       // We support defining multi base paths
@@ -118,7 +118,7 @@ public class HoodieMultiTableServicesMain {
     } else {
       tablePaths = MultiTableServiceUtils.getTablesToBeServedFromProps(jsc, 
props);
     }
-    LOG.info("All table paths: " + String.join(",", tablePaths));
+    log.info("All table paths: {}", String.join(",", tablePaths));
     if (cfg.batch) {
       batchRunTableServices(tablePaths);
     } else {
@@ -253,7 +253,7 @@ public class HoodieMultiTableServicesMain {
     try {
       new HoodieMultiTableServicesMain(jsc, cfg).startServices();
     } catch (Throwable throwable) {
-      LOG.error("Fail to run table services, ", throwable);
+      log.error("Fail to run table services, ", throwable);
     } finally {
       jsc.stop();
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java
index 41bd7248f0f3..3c1f1453f54e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/multitable/MultiTableServiceUtils.java
@@ -29,13 +29,12 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.utilities.UtilHelpers;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -49,8 +48,8 @@ import static 
org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME
 /**
  * Utils for executing multi-table services.
  */
+@Slf4j
 public class MultiTableServiceUtils {
-  private static final Logger LOG = 
LoggerFactory.getLogger(MultiTableServiceUtils.class);
 
   public static class Constants {
     public static final String TABLES_TO_BE_SERVED_PROP = 
"hoodie.tableservice.tablesToServe";
@@ -79,7 +78,7 @@ public class MultiTableServiceUtils {
               return true;
             } else {
               // Log the wrong path in console.
-              LOG.info("Hoodie table not found in path {}, skip", tablePath);
+              log.info("Hoodie table not found in path {}, skip", tablePath);
               return false;
             }
           }).collect(Collectors.toList());
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
index 330eece72e9a..39efad86c3d3 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
@@ -40,10 +40,9 @@ import com.beust.jcommander.Parameter;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Snapshot;
 import com.codahale.metrics.UniformReservoir;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -62,10 +61,10 @@ import java.util.stream.IntStream;
 
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 
+@Slf4j
 public class TimelineServerPerf implements Serializable {
 
   private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LoggerFactory.getLogger(TimelineServerPerf.class);
   private final Config cfg;
   private transient TimelineService timelineServer;
   private final boolean useExternalTimelineServer;
@@ -84,10 +83,10 @@ public class TimelineServerPerf implements Serializable {
   private void setHostAddrFromSparkConf(SparkConf sparkConf) {
     String hostAddr = sparkConf.get("spark.driver.host", null);
     if (hostAddr != null) {
-      LOG.info("Overriding hostIp to ({}) found in spark-conf. It was {}", 
hostAddr, this.hostAddr);
+      log.info("Overriding hostIp to ({}) found in spark-conf. It was {}", 
hostAddr, this.hostAddr);
       this.hostAddr = hostAddr;
     } else {
-      LOG.warn("Unable to find driver bind address from spark config");
+      log.warn("Unable to find driver bind address from spark config");
     }
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DelegatingSchemaProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DelegatingSchemaProvider.java
index 5662c46169ce..dd7924a62a56 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DelegatingSchemaProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DelegatingSchemaProvider.java
@@ -21,11 +21,13 @@ package org.apache.hudi.utilities.schema;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.schema.HoodieSchema;
 
+import lombok.Getter;
 import org.apache.spark.api.java.JavaSparkContext;
 
 /**
  * SchemaProvider which uses separate Schema Providers for source and target.
  */
+@Getter
 public final class DelegatingSchemaProvider extends SchemaProvider {
 
   private final SchemaProvider sourceSchemaProvider;
@@ -48,12 +50,4 @@ public final class DelegatingSchemaProvider extends 
SchemaProvider {
   public HoodieSchema getTargetHoodieSchema() {
     return targetSchemaProvider.getTargetHoodieSchema();
   }
-
-  public SchemaProvider getSourceSchemaProvider() {
-    return sourceSchemaProvider;
-  }
-
-  public SchemaProvider getTargetSchemaProvider() {
-    return targetSchemaProvider;
-  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
index 97447580ced2..ff843f1ab537 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
@@ -31,6 +31,7 @@ import 
org.apache.hudi.utilities.sources.helpers.SanitizationUtils;
 
 import io.confluent.kafka.schemaregistry.ParsedSchema;
 import io.confluent.kafka.schemaregistry.json.JsonSchema;
+import lombok.Getter;
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -54,6 +55,7 @@ public class FilebasedSchemaProvider extends SchemaProvider {
   private final String sourceFile;
   private final String targetFile;
 
+  @Getter
   protected Schema sourceSchema;
 
   protected Schema targetSchema;
@@ -74,11 +76,6 @@ public class FilebasedSchemaProvider extends SchemaProvider {
     return readSchemaFromFile(schemaFile, this.fs, config);
   }
 
-  @Override
-  public Schema getSourceSchema() {
-    return sourceSchema;
-  }
-
   @Override
   public Schema getTargetSchema() {
     if (targetSchema != null) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
index 6c58fb2739dc..7ec566b33db3 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.utilities.config.HiveSchemaProviderConfig;
 import org.apache.hudi.utilities.exception.HoodieSchemaFetchException;
 
+import lombok.Getter;
 import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SparkSession;
@@ -42,10 +43,11 @@ import static 
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 /**
  * A schema provider to get data schema through user specified hive table.
  */
+@Getter
 public class HiveSchemaProvider extends SchemaProvider {
 
-  private final HoodieSchema sourceSchema;
-  private HoodieSchema targetSchema;
+  private final HoodieSchema sourceHoodieSchema;
+  private HoodieSchema targetHoodieSchema;
 
   public HiveSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
     super(props, jssc);
@@ -58,7 +60,7 @@ public class HiveSchemaProvider extends SchemaProvider {
     try {
       TableIdentifier sourceSchemaTable = new 
TableIdentifier(sourceSchemaTableName, 
scala.Option.apply(sourceSchemaDatabaseName));
       StructType sourceSchema = 
spark.sessionState().catalog().getTableMetadata(sourceSchemaTable).schema();
-      this.sourceSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+      this.sourceHoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
           sourceSchema,
           sourceSchemaTableName,
           "hoodie." + sourceSchemaDatabaseName);
@@ -73,7 +75,7 @@ public class HiveSchemaProvider extends SchemaProvider {
       try {
         TableIdentifier targetSchemaTable = new 
TableIdentifier(targetSchemaTableName, 
scala.Option.apply(targetSchemaDatabaseName));
         StructType targetSchema = 
spark.sessionState().catalog().getTableMetadata(targetSchemaTable).schema();
-        this.targetSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+        this.targetHoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
             targetSchema,
             targetSchemaTableName,
             "hoodie." + targetSchemaDatabaseName);
@@ -84,14 +86,16 @@ public class HiveSchemaProvider extends SchemaProvider {
   }
 
   @Override
+  @Deprecated
   public Schema getSourceSchema() {
-    return sourceSchema.toAvroSchema();
+    return getSourceHoodieSchema().toAvroSchema();
   }
 
   @Override
+  @Deprecated
   public Schema getTargetSchema() {
-    if (targetSchema != null) {
-      return targetSchema.toAvroSchema();
+    if (getTargetHoodieSchema() != null) {
+      return getTargetHoodieSchema().toAvroSchema();
     } else {
       return super.getTargetSchema();
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
index b23f7d24fbc1..159634c8818d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
@@ -40,13 +40,12 @@ import 
io.confluent.kafka.schemaregistry.client.rest.RestService;
 import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
 import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
 import org.apache.http.ssl.SSLContextBuilder;
 import org.apache.http.ssl.SSLContexts;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.SSLSocketFactory;
@@ -79,8 +78,9 @@ import static 
org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
  * <p>
  * https://github.com/confluentinc/schema-registry
  */
+@Slf4j
 public class SchemaRegistryProvider extends SchemaProvider {
-  private static final Logger LOG = 
LoggerFactory.getLogger(SchemaRegistryProvider.class);
+
   private static final Pattern URL_PATTERN = 
Pattern.compile("(.*/)subjects/(.*)/versions/(.*)");
   private static final String LATEST = "latest";
 
@@ -195,7 +195,7 @@ public class SchemaRegistryProvider extends SchemaProvider {
     } catch (IllegalAccessError error) {
       // If we're not processing Protobuf schema, fall back to the legacy 
method
       if (!ProtobufSchema.TYPE.equalsIgnoreCase(schemaType)) {
-        LOG.warn("Falling back to legacy schema retrieval due to 
IllegalAccessError", error);
+        log.warn("Falling back to legacy schema retrieval due to 
IllegalAccessError", error);
         return fetchSchemaUsingLegacyMethod(registryUrl);
       }
       // Otherwise, rethrow the error
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SimpleSchemaProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SimpleSchemaProvider.java
index 854486e0343b..c9698feb9ad8 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SimpleSchemaProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SimpleSchemaProvider.java
@@ -21,9 +21,11 @@ package org.apache.hudi.utilities.schema;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.schema.HoodieSchema;
 
+import lombok.Getter;
 import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaSparkContext;
 
+@Getter
 public class SimpleSchemaProvider extends SchemaProvider {
 
   private final Schema sourceSchema;
@@ -32,9 +34,4 @@ public class SimpleSchemaProvider extends SchemaProvider {
     super(props, jssc);
     this.sourceSchema = sourceSchema.toAvroSchema();
   }
-
-  @Override
-  public Schema getSourceSchema() {
-    return sourceSchema;
-  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DeleteSupportSchemaPostProcessor.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DeleteSupportSchemaPostProcessor.java
index acc9ad78cb2d..cb2079a9d25c 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DeleteSupportSchemaPostProcessor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DeleteSupportSchemaPostProcessor.java
@@ -26,10 +26,9 @@ import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.utilities.schema.SchemaPostProcessor;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -38,10 +37,9 @@ import java.util.List;
  * An implementation of {@link SchemaPostProcessor} which will add a column 
named "_hoodie_is_deleted" to the end of
  * a given schema.
  */
+@Slf4j
 public class DeleteSupportSchemaPostProcessor extends SchemaPostProcessor {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(DeleteSupportSchemaPostProcessor.class);
-
   public DeleteSupportSchemaPostProcessor(TypedProperties props, 
JavaSparkContext jssc) {
     super(props, jssc);
   }
@@ -55,7 +53,7 @@ public class DeleteSupportSchemaPostProcessor extends 
SchemaPostProcessor {
   @Override
   public HoodieSchema processSchema(HoodieSchema schema) {
     if (schema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD).isPresent()) {
-      LOG.warn("column {} already exists!", 
HoodieRecord.HOODIE_IS_DELETED_FIELD);
+      log.warn("column {} already exists!", 
HoodieRecord.HOODIE_IS_DELETED_FIELD);
       return schema;
     }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java
index 703dea14544f..bb3de9344ea7 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java
@@ -27,10 +27,9 @@ import 
org.apache.hudi.utilities.config.SchemaProviderPostProcessorConfig;
 import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
 import org.apache.hudi.utilities.schema.SchemaPostProcessor;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.LinkedList;
@@ -49,10 +48,9 @@ import static 
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
  * <p>
  * 
properties.put("hoodie.streamer.schemaprovider.schema_post_processor.delete.columns",
 "column1,column2").
  */
+@Slf4j
 public class DropColumnSchemaPostProcessor extends SchemaPostProcessor {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(DropColumnSchemaPostProcessor.class);
-
   public DropColumnSchemaPostProcessor(TypedProperties props, JavaSparkContext 
jssc) {
     super(props, jssc);
   }
@@ -77,7 +75,7 @@ public class DropColumnSchemaPostProcessor extends 
SchemaPostProcessor {
         this.config, 
SchemaProviderPostProcessorConfig.DELETE_COLUMN_POST_PROCESSOR_COLUMN);
 
     if (StringUtils.isNullOrEmpty(columnToDeleteStr)) {
-      LOG.warn("Param {} is null or empty, return original schema",
+      log.warn("Param {} is null or empty, return original schema",
           
SchemaProviderPostProcessorConfig.DELETE_COLUMN_POST_PROCESSOR_COLUMN.key());
     }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
index 726770e5ad4a..79ad5ec3dcf1 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
@@ -27,6 +27,8 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.utilities.exception.HoodieTransformPlanException;
 import org.apache.hudi.utilities.streamer.HoodieStreamer;
 
+import lombok.AccessLevel;
+import lombok.Getter;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -141,6 +143,8 @@ public class ChainedTransformer implements Transformer {
   }
 
   protected static class TransformerInfo {
+
+    @Getter(AccessLevel.PROTECTED)
     private final Transformer transformer;
     private final Option<String> idOpt;
 
@@ -154,10 +158,6 @@ public class ChainedTransformer implements Transformer {
       this.idOpt = Option.empty();
     }
 
-    protected Transformer getTransformer() {
-      return transformer;
-    }
-
     private boolean hasIdentifier() {
       return idOpt.isPresent();
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
index 1256491528d1..76f21a308e39 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
@@ -21,24 +21,23 @@ package org.apache.hudi.utilities.transform;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.utilities.exception.HoodieTransformExecutionException;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.UUID;
 
 /**
  * Transformer that can flatten nested objects. It currently doesn't unnest 
arrays.
  */
+@Slf4j
 public class FlatteningTransformer implements Transformer {
 
   private static final String TMP_TABLE = "HUDI_SRC_TMP_TABLE_";
-  private static final Logger LOG = 
LoggerFactory.getLogger(FlatteningTransformer.class);
 
   /**
    * Configs supported.
@@ -49,7 +48,7 @@ public class FlatteningTransformer implements Transformer {
     try {
       // tmp table name doesn't like dashes
       String tmpTable = 
TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
-      LOG.info("Registering tmp table : " + tmpTable);
+      log.info("Registering tmp table: {}", tmpTable);
       rowDataset.createOrReplaceTempView(tmpTable);
       Dataset<Row> transformed = sparkSession.sql("select " + 
flattenSchema(rowDataset.schema(), null) + " from " + tmpTable);
       sparkSession.catalog().dropTempView(tmpTable);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java
index cdef1677e587..1e8af287f809 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java
@@ -23,14 +23,13 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.utilities.config.SqlTransformerConfig;
 import org.apache.hudi.utilities.exception.HoodieTransformExecutionException;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Scanner;
@@ -56,10 +55,9 @@ import static 
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
  * <p>
  * SELECT * FROM tmp_personal_trips;
  */
+@Slf4j
 public class SqlFileBasedTransformer implements Transformer {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(SqlFileBasedTransformer.class);
-
   private static final String SRC_PATTERN = "<SRC>";
   private static final String TMP_TABLE = "HOODIE_SRC_TMP_TABLE_";
 
@@ -75,19 +73,19 @@ public class SqlFileBasedTransformer implements Transformer 
{
     final FileSystem fs = HadoopFSUtils.getFs(sqlFile, 
jsc.hadoopConfiguration(), true);
     // tmp table name doesn't like dashes
     final String tmpTable = 
TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
-    LOG.info("Registering tmp table: {}", tmpTable);
+    log.info("Registering tmp table: {}", tmpTable);
     rowDataset.createOrReplaceTempView(tmpTable);
 
     try (final Scanner scanner = new Scanner(fs.open(new Path(sqlFile)), 
"UTF-8")) {
       Dataset<Row> rows = null;
       // each sql statement is separated with semicolon hence set that as 
delimiter.
       scanner.useDelimiter(";");
-      LOG.info("SQL Query for transformation:");
+      log.info("SQL Query for transformation:");
       while (scanner.hasNext()) {
         String sqlStr = scanner.next();
         sqlStr = sqlStr.replaceAll(SRC_PATTERN, tmpTable).trim();
         if (!sqlStr.isEmpty()) {
-          LOG.info(sqlStr);
+          log.info(sqlStr);
           // overwrite the same dataset object until the last statement then 
return.
           rows = sparkSession.sql(sqlStr);
         }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java
index 290e3a69432a..9013ab04b9b3 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java
@@ -22,12 +22,11 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.utilities.config.SqlTransformerConfig;
 import org.apache.hudi.utilities.exception.HoodieTransformExecutionException;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.UUID;
 
@@ -38,10 +37,9 @@ import static 
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
  *
  * The query should reference the source as a table named "\<SRC\>"
  */
+@Slf4j
 public class SqlQueryBasedTransformer implements Transformer {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(SqlQueryBasedTransformer.class);
-
   private static final String SRC_PATTERN = "<SRC>";
   private static final String TMP_TABLE = "HOODIE_SRC_TMP_TABLE_";
 
@@ -53,10 +51,10 @@ public class SqlQueryBasedTransformer implements 
Transformer {
     try {
       // tmp table name doesn't like dashes
       String tmpTable = 
TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
-      LOG.info("Registering tmp table: {}", tmpTable);
+      log.info("Registering tmp table: {}", tmpTable);
       rowDataset.createOrReplaceTempView(tmpTable);
       String sqlStr = transformerSQL.replaceAll(SRC_PATTERN, tmpTable);
-      LOG.debug("SQL Query for transformation: {}", sqlStr);
+      log.debug("SQL Query for transformation: {}", sqlStr);
       Dataset<Row> transformed = sparkSession.sql(sqlStr);
       sparkSession.catalog().dropTempView(tmpTable);
       return transformed;


Reply via email to