This is an automated email from the ASF dual-hosted git repository.

voonhous pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 612e327b51fb refactor: Add Lombok annotations to hudi-common module 
(part 7) (#18944)
612e327b51fb is described below

commit 612e327b51fbb4fc792dcaeecf47aa86f5ddab89
Author: voonhous <[email protected]>
AuthorDate: Fri Jun 12 09:53:08 2026 +0800

    refactor: Add Lombok annotations to hudi-common module (part 7) (#18944)
    
    * refactor: Add Lombok annotations to hudi-common module (part 7)
    
    * refactor: Address review comments
    
    - HoodieLogBlock: revert lombok @NonNull to javax.annotation.Nonnull; null
      header/footer is expected on the read path (v2/v3 blocks have no footer),
      so runtime null checks would NPE on every log block read
    - IncrementalQueryAnalyzer: make QueryContext constructor private again via
      @AllArgsConstructor(access = AccessLevel.PRIVATE); fix EMPTY constant to
      pass Option.empty() instead of null
    - HoodieTableConfig: use parameterized logging instead of String.format
    - HoodieTableVersion: document why versionCode uses @Accessors(fluent = 
true)
---
 .../hudi/common/table/HoodieTableConfig.java       |  24 ++--
 .../hudi/common/table/HoodieTableMetaClient.java   | 159 +++++++--------------
 .../hudi/common/table/HoodieTableVersion.java      |  24 ++--
 .../hudi/common/table/TableSchemaResolver.java     |  12 +-
 .../hudi/common/table/cdc/HoodieCDCExtractor.java  |   6 +-
 .../hudi/common/table/cdc/HoodieCDCFileSplit.java  |  24 +---
 .../hudi/common/table/cdc/HoodieCDCOperation.java  |  14 +-
 .../hudi/common/table/checkpoint/Checkpoint.java   |  21 ++-
 .../table/log/AbstractHoodieLogRecordScanner.java  |  92 ++++--------
 .../table/log/BaseHoodieLogRecordReader.java       | 103 +++++--------
 .../apache/hudi/common/table/log/FullKeySpec.java  |  15 +-
 .../hudi/common/table/log/HoodieLogFileReader.java |  26 ++--
 .../hudi/common/table/log/HoodieLogFormat.java     |  12 +-
 .../common/table/log/HoodieLogFormatReader.java    |   8 +-
 .../table/log/HoodieMergedLogRecordReader.java     |  20 +--
 .../table/log/HoodieMergedLogRecordScanner.java    |  20 +--
 .../apache/hudi/common/table/log/InstantRange.java |  17 +--
 .../common/table/log/block/HoodieCommandBlock.java |   7 +-
 .../common/table/log/block/HoodieDataBlock.java    |  11 +-
 .../common/table/log/block/HoodieDeleteBlock.java  |   6 +-
 .../table/log/block/HoodieHFileDataBlock.java      |   2 +
 .../common/table/log/block/HoodieLogBlock.java     |  94 +++---------
 .../hudi/common/table/read/BufferedRecord.java     |  47 ++----
 .../table/read/BufferedRecordMergerFactory.java    |   7 +-
 .../hudi/common/table/read/DeleteContext.java      |  22 +--
 .../table/read/IncrementalQueryAnalyzer.java       |  70 +++------
 .../buffer/DefaultFileGroupRecordBufferLoader.java |   8 +-
 .../table/read/buffer/FileGroupRecordBuffer.java   |  19 +--
 .../buffer/PositionBasedFileGroupRecordBuffer.java |  15 +-
 .../source/split/HoodieSourceSplitSerializer.java  |  12 +-
 .../hudi/source/TestIncrementalInputSplits.java    |   4 +-
 .../source/TestStreamReadMonitoringFunction.java   |   4 +-
 .../hudi/source/split/TestHoodieSourceSplit.java   |  14 +-
 .../split/TestHoodieSourceSplitSerializer.java     |  76 +++++-----
 34 files changed, 359 insertions(+), 656 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index ca775136257c..73703d70d85a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -65,8 +65,7 @@ import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import javax.annotation.concurrent.Immutable;
 
@@ -119,6 +118,7 @@ import static 
org.apache.hudi.common.util.StringUtils.nonEmpty;
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 
 @Immutable
+@Slf4j
 @ConfigClassProperty(name = "Hudi Table Basic Configs",
     groupName = ConfigGroups.Names.TABLE_CONFIG,
     description = "Configurations of the Hudi Table like type of ingestion, 
storage formats, hive table name etc."
@@ -126,8 +126,6 @@ import static 
org.apache.hudi.common.util.ValidationUtils.checkArgument;
         + " initializing a path as hoodie base path and never changes during 
the lifetime of a hoodie table.")
 public class HoodieTableConfig extends HoodieConfig {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieTableConfig.class);
-
   public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties";
   public static final String HOODIE_PROPERTIES_FILE_BACKUP = 
"hoodie.properties.backup";
   public static final String HOODIE_WRITE_TABLE_NAME_KEY = 
"hoodie.datasource.write.table.name";
@@ -475,7 +473,7 @@ public class HoodieTableConfig extends HoodieConfig {
   public HoodieTableConfig(HoodieStorage storage, StoragePath metaPath) {
     super();
     StoragePath propertyPath = new StoragePath(metaPath, 
HOODIE_PROPERTIES_FILE);
-    LOG.info("Loading table properties from " + propertyPath);
+    log.info("Loading table properties from {}", propertyPath);
     try {
       this.props = fetchConfigs(storage, metaPath, HOODIE_PROPERTIES_FILE, 
HOODIE_PROPERTIES_FILE_BACKUP, MAX_READ_RETRIES, READ_RETRY_DELAY_MSEC);
     } catch (IOException e) {
@@ -509,7 +507,7 @@ public class HoodieTableConfig extends HoodieConfig {
       checksum = propsWithChecksum.getProperty(TABLE_CHECKSUM.key());
       props.setProperty(TABLE_CHECKSUM.key(), checksum);
     }
-    LOG.info("Created properties file at " + propertyPath);
+    log.info("Created properties file at {}", propertyPath);
     return checksum;
   }
 
@@ -556,7 +554,7 @@ public class HoodieTableConfig extends HoodieConfig {
         propsToDelete.forEach(propToDelete -> props.remove(propToDelete));
         checksum = storeProperties(props, out, cfgPath);
       }
-      LOG.warn(String.format("%s modified to: %s (at %s)", cfgPath.getName(), 
props, cfgPath.getParent()));
+      log.warn("{} modified to: {} (at {})", cfgPath.getName(), props, 
cfgPath.getParent());
 
       // 5. verify and remove backup.
       try (InputStream in = storage.open(cfgPath)) {
@@ -582,7 +580,7 @@ public class HoodieTableConfig extends HoodieConfig {
 
   private static void deleteFile(HoodieStorage storage, StoragePath cfgPath) 
throws IOException {
     storage.deleteFile(cfgPath);
-    LOG.info("Deleted properties file at " + cfgPath);
+    log.info("Deleted properties file at {}", cfgPath);
   }
 
   /**
@@ -685,7 +683,7 @@ public class HoodieTableConfig extends HoodieConfig {
     boolean valid = tableVersion.greaterThan(firstVersion) || 
tableVersion.equals(firstVersion);
     valid = valid || 
CONFIGS_REQUIRED_FOR_OLDER_VERSIONED_TABLES.contains(configProperty.key());
     if (!valid) {
-      LOG.warn("Table version {} is lower than or equal to config's first 
version {}. Config {} will be ignored.",
+      log.warn("Table version {} is lower than or equal to config's first 
version {}. Config {} will be ignored.",
           tableVersion, firstVersion, configProperty.key());
     }
     return valid;
@@ -1009,12 +1007,12 @@ public class HoodieTableConfig extends HoodieConfig {
     // Check ordering field name based on record merge mode
     if (inferredRecordMergeMode == COMMIT_TIME_ORDERING) {
       if (nonEmpty(orderingFieldNamesAsString)) {
-        LOG.warn("The ordering field ({}) is specified. COMMIT_TIME_ORDERING "
+        log.warn("The ordering field ({}) is specified. COMMIT_TIME_ORDERING "
             + "merge mode does not use ordering field anymore.", 
orderingFieldNamesAsString);
       }
     } else if (inferredRecordMergeMode == EVENT_TIME_ORDERING) {
       if (isNullOrEmpty(orderingFieldNamesAsString)) {
-        LOG.warn("The ordering field is not specified. EVENT_TIME_ORDERING "
+        log.warn("The ordering field is not specified. EVENT_TIME_ORDERING "
             + "merge mode requires ordering field to be set for getting the "
             + "event time. Using commit time-based ordering now.");
       }
@@ -1324,7 +1322,7 @@ public class HoodieTableConfig extends HoodieConfig {
     setValue(TABLE_METADATA_PARTITIONS, 
partitions.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
     setValue(TABLE_METADATA_PARTITIONS_INFLIGHT, 
partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
     update(metaClient.getStorage(), metaClient.getMetaPath(), getProps());
-    LOG.info("MDT {} partition {} has been {}", metaClient.getBasePath(), 
partitionPath, enabled ? "enabled" : "disabled");
+    log.info("MDT {} partition {} has been {}", metaClient.getBasePath(), 
partitionPath, enabled ? "enabled" : "disabled");
   }
 
   /**
@@ -1342,7 +1340,7 @@ public class HoodieTableConfig extends HoodieConfig {
 
     setValue(TABLE_METADATA_PARTITIONS_INFLIGHT, 
partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
     update(metaClient.getStorage(), metaClient.getMetaPath(), getProps());
-    LOG.info("MDT {} partitions {} have been set to inflight", 
metaClient.getBasePath(), partitionPaths);
+    log.info("MDT {} partitions {} have been set to inflight", 
metaClient.getBasePath(), partitionPaths);
   }
 
   public void setMetadataPartitionsInflight(HoodieTableMetaClient metaClient, 
MetadataPartitionType... partitionTypes) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index d724dda09f06..255c1d401f51 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -74,8 +74,11 @@ import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathFilter;
 import org.apache.hudi.storage.StoragePathInfo;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -118,10 +121,12 @@ import static 
org.apache.hudi.metadata.HoodieIndexVersion.isValidIndexDefinition
  * @see HoodieTimeline
  * @since 0.3.0
  */
+@NoArgsConstructor
+@Getter
+@Slf4j
 public class HoodieTableMetaClient implements Serializable {
 
   private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieTableMetaClient.class);
   public static final String METADATA_STR = "metadata";
   public static final String METAFOLDER_NAME = ".hoodie";
   public static final String TIMELINEFOLDER_NAME = "timeline";
@@ -163,20 +168,27 @@ public class HoodieTableMetaClient implements 
Serializable {
   protected StoragePath basePath;
   protected StoragePath metaPath;
 
+  @Getter(AccessLevel.NONE)
+  @Setter
   private transient HoodieStorage storage;
+  @Getter(AccessLevel.NONE)
   private boolean loadActiveTimelineOnLoad;
   protected StorageConfiguration<?> storageConf;
   private HoodieTableType tableType;
   private TimelineLayoutVersion timelineLayoutVersion;
   private TimelineLayout timelineLayout;
   private StoragePath timelinePath;
+  @Getter(AccessLevel.NONE)
   private StoragePath timelineHistoryPath;
   protected HoodieTableConfig tableConfig;
+  @Getter(AccessLevel.NONE)
   protected HoodieActiveTimeline activeTimeline;
   private ConsistencyGuardConfig consistencyGuardConfig = 
ConsistencyGuardConfig.newBuilder().build();
   private FileSystemRetryConfig fileSystemRetryConfig = 
FileSystemRetryConfig.newBuilder().build();
+  @Getter(AccessLevel.NONE)
   protected HoodieMetaserverConfig metaserverConfig;
   private HoodieTimeGeneratorConfig timeGeneratorConfig;
+  @Getter(AccessLevel.NONE)
   private Option<HoodieIndexMetadata> indexMetadataOpt;
   private HoodieTableFormat tableFormat;
 
@@ -187,7 +199,7 @@ public class HoodieTableMetaClient implements Serializable {
   protected HoodieTableMetaClient(HoodieStorage storage, String basePath, 
boolean loadActiveTimelineOnLoad,
                                   ConsistencyGuardConfig 
consistencyGuardConfig, Option<TimelineLayoutVersion> layoutVersion,
                                   HoodieTimeGeneratorConfig 
timeGeneratorConfig, FileSystemRetryConfig fileSystemRetryConfig) {
-    LOG.debug("Loading HoodieTableMetaClient from " + basePath);
+    log.debug("Loading HoodieTableMetaClient from {}", basePath);
     this.timeGeneratorConfig = timeGeneratorConfig;
     this.consistencyGuardConfig = consistencyGuardConfig;
     this.fileSystemRetryConfig = fileSystemRetryConfig;
@@ -213,21 +225,13 @@ public class HoodieTableMetaClient implements 
Serializable {
     this.timelinePath = 
timelineLayout.getTimelinePathProvider().getTimelinePath(tableConfig, 
this.basePath);
     this.timelineHistoryPath = 
timelineLayout.getTimelinePathProvider().getTimelineHistoryPath(tableConfig, 
this.basePath);
     this.loadActiveTimelineOnLoad = loadActiveTimelineOnLoad;
-    LOG.debug("Finished Loading Table of type " + tableType + "(version=" + 
timelineLayoutVersion + ") from " + basePath);
+    log.debug("Finished Loading Table of type {}(version={}) from {}", 
tableType, timelineLayoutVersion, basePath);
     if (loadActiveTimelineOnLoad) {
-      LOG.info("Loading Active commit timeline for " + basePath);
+      log.info("Loading Active commit timeline for {}", basePath);
       getActiveTimeline();
     }
   }
 
-  /**
-   * For serializing and de-serializing.
-   *
-   * @deprecated
-   */
-  public HoodieTableMetaClient() {
-  }
-
   public String getIndexDefinitionPath() {
     return tableConfig.getRelativeIndexDefinitionPath()
         .map(definitionPath -> new StoragePath(basePath, 
definitionPath).toString())
@@ -250,7 +254,7 @@ public class HoodieTableMetaClient implements Serializable {
       Option<HoodieIndexDefinition> existingIndexOpt = 
indexMetadataOpt.get().getIndex(indexName);
       if (existingIndexOpt.isPresent()) {
         if 
(!existingIndexOpt.get().getSourceFields().equals(indexDefinition.getSourceFields()))
 {
-          LOG.info("List of columns to index is changing. Old value {}. New 
value {}", existingIndexOpt.get().getSourceFields(),
+          log.info("List of columns to index is changing. Old value {}. New 
value {}", existingIndexOpt.get().getSourceFields(),
               indexDefinition.getSourceFields());
           indexMetadataOpt.get().getIndexDefinitions().put(indexName, 
indexDefinition);
         } else {
@@ -386,35 +390,6 @@ public class HoodieTableMetaClient implements Serializable 
{
     out.defaultWriteObject();
   }
 
-  /**
-   * Returns base path of the table
-   */
-  public StoragePath getBasePath() {
-    return basePath; // this invocation is cached
-  }
-
-  /**
-   * @return Hoodie Table Type
-   */
-  public HoodieTableType getTableType() {
-    return tableType;
-  }
-
-  /**
-   * @return Meta path
-   */
-  public StoragePath getMetaPath() {
-    return metaPath;
-  }
-
-  public StoragePath getTimelinePath() {
-    return timelinePath;
-  }
-
-  public HoodieTableFormat getTableFormat() {
-    return tableFormat;
-  }
-
   /**
    * @return schema folder path
    */
@@ -492,21 +467,6 @@ public class HoodieTableMetaClient implements Serializable 
{
     return timelineHistoryPath;
   }
 
-  /**
-   * @return Table Config
-   */
-  public HoodieTableConfig getTableConfig() {
-    return tableConfig;
-  }
-
-  public TimelineLayoutVersion getTimelineLayoutVersion() {
-    return timelineLayoutVersion;
-  }
-
-  public TimelineLayout getTimelineLayout() {
-    return timelineLayout;
-  }
-
   public boolean isMetadataTable() {
     return HoodieTableMetadata.isMetadataTable(getBasePath());
   }
@@ -540,18 +500,10 @@ public class HoodieTableMetaClient implements 
Serializable {
         consistencyGuard);
   }
 
-  public void setStorage(HoodieStorage storage) {
-    this.storage = storage;
-  }
-
   public HoodieStorage getRawStorage() {
     return getStorage().getRawStorage();
   }
 
-  public StorageConfiguration<?> getStorageConf() {
-    return storageConf;
-  }
-
   /**
    * Get the active instants as a timeline.
    *
@@ -621,18 +573,6 @@ public class HoodieTableMetaClient implements Serializable 
{
     return TimelineUtils.generateInstantTime(shouldLock, timeGenerator);
   }
 
-  public HoodieTimeGeneratorConfig getTimeGeneratorConfig() {
-    return timeGeneratorConfig;
-  }
-
-  public ConsistencyGuardConfig getConsistencyGuardConfig() {
-    return consistencyGuardConfig;
-  }
-
-  public FileSystemRetryConfig getFileSystemRetryConfig() {
-    return fileSystemRetryConfig;
-  }
-
   /**
    * Get the archived commits as a timeline. This is costly operation, as all 
data from the archived files are read.
    * This should not be used, unless for historical debugging purposes.
@@ -696,7 +636,7 @@ public class HoodieTableMetaClient implements Serializable {
                                                 Properties props,
                                                 Integer timelineLayout,
                                                 boolean 
shouldCreateTableConfig) throws IOException {
-    LOG.info("Initializing {} as hoodie table", basePath);
+    log.info("Initializing {} as hoodie table", basePath);
     final HoodieStorage storage = HoodieStorageUtils.getStorage(basePath, 
storageConf);
     if (!storage.exists(basePath)) {
       storage.createDirectory(basePath);
@@ -871,31 +811,6 @@ public class HoodieTableMetaClient implements Serializable 
{
     return instantStream.sorted().collect(Collectors.toList());
   }
 
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    HoodieTableMetaClient that = (HoodieTableMetaClient) o;
-    return Objects.equals(basePath, that.basePath) && tableType == 
that.tableType;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(basePath, tableType);
-  }
-
-  @Override
-  public String toString() {
-    return "HoodieTableMetaClient{" + "basePath='" + basePath + '\''
-        + ", metaPath='" + metaPath + '\''
-        + ", tableType=" + tableType
-        + '}';
-  }
-
   public void initializeBootstrapDirsIfNotExists() throws IOException {
     initializeBootstrapDirsIfNotExists(basePath, getStorage());
   }
@@ -1037,6 +952,34 @@ public class HoodieTableMetaClient implements 
Serializable {
     return getTimelineLayout().getCommitMetadataSerDe();
   }
 
+  // Not using Lombok @EqualsAndHashCode/@ToString here: this class is 
subclassed (e.g. HoodieTableMetaserverClient),
+  // and we rely on the runtime subtype - exact-class matching in equals() and 
the declaring-class behavior below.
+  // Lombok would switch equals() to instanceof, making a base instance 
compare equal to a subtype instance.
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    HoodieTableMetaClient that = (HoodieTableMetaClient) o;
+    return Objects.equals(basePath, that.basePath) && tableType == 
that.tableType;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(basePath, tableType);
+  }
+
+  @Override
+  public String toString() {
+    return "HoodieTableMetaClient{" + "basePath='" + basePath + '\''
+        + ", metaPath='" + metaPath + '\''
+        + ", tableType=" + tableType
+        + '}';
+  }
+
   public static TableBuilder newTableBuilder() {
     return new TableBuilder();
   }
@@ -1044,6 +987,7 @@ public class HoodieTableMetaClient implements Serializable 
{
   /**
    * Builder for {@link Properties}.
    */
+  @NoArgsConstructor(access = AccessLevel.PACKAGE)
   public static class TableBuilder {
 
     private HoodieTableType tableType;
@@ -1093,9 +1037,6 @@ public class HoodieTableMetaClient implements 
Serializable {
      */
     private final Properties others = new Properties();
 
-    TableBuilder() {
-    }
-
     public TableBuilder setTableType(HoodieTableType tableType) {
       this.tableType = tableType;
       return this;
@@ -1667,7 +1608,7 @@ public class HoodieTableMetaClient implements 
Serializable {
       HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(storageConf).setBasePath(basePath)
           .setMetaserverConfig(props)
           .build();
-      LOG.info("Finished initializing Table of type {} from {}", 
metaClient.getTableConfig().getTableType(), basePath);
+      log.info("Finished initializing Table of type {} from {}", 
metaClient.getTableConfig().getTableType(), basePath);
       return metaClient;
     }
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java
index 945ed2366607..7ec8445809c9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java
@@ -22,6 +22,11 @@ import 
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.exception.HoodieException;
 
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.experimental.Accessors;
+
 import java.util.Arrays;
 import java.util.List;
 
@@ -29,7 +34,10 @@ import java.util.List;
  * Table's version that controls what version of writer/readers can actually 
read/write
  * to a given table.
  */
+@AllArgsConstructor
+@Getter
 public enum HoodieTableVersion {
+
   // < 0.6.0 versions
   ZERO(0, CollectionUtils.createImmutableList("0.3.0"), 
TimelineLayoutVersion.LAYOUT_VERSION_0),
   // 0.6.0 onwards
@@ -51,26 +59,14 @@ public enum HoodieTableVersion {
   // 1.1
   NINE(9, CollectionUtils.createImmutableList("1.1.0"), 
TimelineLayoutVersion.LAYOUT_VERSION_2);
 
+  @Accessors(fluent = true) // Required so that #versionCode() is generated 
instead of #getVersionCode() by Lombok
   private final int versionCode;
 
+  @Getter(AccessLevel.NONE)
   private final List<String> releaseVersions;
 
   private final TimelineLayoutVersion timelineLayoutVersion;
 
-  HoodieTableVersion(int versionCode, List<String> releaseVersions, 
TimelineLayoutVersion timelineLayoutVersion) {
-    this.versionCode = versionCode;
-    this.releaseVersions = releaseVersions;
-    this.timelineLayoutVersion = timelineLayoutVersion;
-  }
-
-  public TimelineLayoutVersion getTimelineLayoutVersion() {
-    return timelineLayoutVersion;
-  }
-
-  public int versionCode() {
-    return versionCode;
-  }
-
   public static HoodieTableVersion current() {
     return NINE;
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index ee2cfbf0d362..157f9d4c79ca 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -49,8 +49,7 @@ import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.util.Lazy;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import javax.annotation.concurrent.ThreadSafe;
 
@@ -67,11 +66,10 @@ import java.util.stream.Stream;
 /**
  * Helper class to read schema from data files and log files and to convert it 
between different formats.
  */
+@Slf4j
 @ThreadSafe
 public class TableSchemaResolver {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(TableSchemaResolver.class);
-
   protected final HoodieTableMetaClient metaClient;
 
   /**
@@ -254,11 +252,11 @@ public class TableSchemaResolver {
               .map(writeStat -> new StoragePath(metaClient.getBasePath(), 
writeStat.getPath()));
           return Option.of(fetchSchemaFromFiles(filePaths));
         } else {
-          LOG.debug("Could not find any data file written for commit, so could 
not get schema for table {}", metaClient.getBasePath());
+          log.debug("Could not find any data file written for commit, so could 
not get schema for table {}", metaClient.getBasePath());
           return Option.empty();
         }
       default:
-        LOG.error("Unknown table type {}", metaClient.getTableType());
+        log.error("Unknown table type {}", metaClient.getTableType());
         throw new InvalidTableException(metaClient.getBasePath().toString());
     }
   }
@@ -373,7 +371,7 @@ public class TableSchemaResolver {
       HoodieSchema tableSchema = getTableSchemaFromDataFile();
       return 
tableSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD).isPresent();
     } catch (Exception e) {
-      LOG.info("Failed to read operation field from schema ({})", 
e.getMessage());
+      log.info("Failed to read operation field from schema ({})", 
e.getMessage());
       return false;
     }
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
index 612837c4dc4d..beb9b56bc66a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
@@ -219,9 +219,9 @@ public class HoodieCDCExtractor {
     try {
       Set<String> requiredActions = new HashSet<>(Arrays.asList(COMMIT_ACTION, 
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION, CLUSTERING_ACTION));
       HoodieActiveTimeline activeTimeLine = metaClient.getActiveTimeline();
-      if (instantRange.getStartInstant().isPresent() && 
!metaClient.getArchivedTimeline().empty()
-          && 
InstantComparison.compareTimestamps(metaClient.getArchivedTimeline().lastInstant().get().requestedTime(),
 InstantComparison.GREATER_THAN, instantRange.getStartInstant().get())) {
-        throw new HoodieException("Start instant time " + 
instantRange.getStartInstant().get()
+      if (instantRange.getStartInstantOpt().isPresent() && 
!metaClient.getArchivedTimeline().empty()
+          && 
InstantComparison.compareTimestamps(metaClient.getArchivedTimeline().lastInstant().get().requestedTime(),
 InstantComparison.GREATER_THAN, instantRange.getStartInstantOpt().get())) {
+        throw new HoodieException("Start instant time " + 
instantRange.getStartInstantOpt().get()
             + " for CDC query has to be in the active timeline. Beginning of 
active timeline " + activeTimeLine.firstInstant().get().requestedTime());
       }
       this.commits = activeTimeLine.getInstantsAsStream()
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java
index c33df8414bd2..24fb94356a0d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java
@@ -22,6 +22,8 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.util.Option;
 
+import lombok.Getter;
+
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
@@ -45,7 +47,9 @@ import java.util.stream.Collectors;
  * For `cdcInferCase` = {@link HoodieCDCInferenceCase#REPLACE_COMMIT}, 
`cdcFile` is null,
  * `beforeFileSlice` is the current version of the file slice.
  */
+@Getter
 public class HoodieCDCFileSplit implements Serializable, 
Comparable<HoodieCDCFileSplit> {
+
   /**
    * The instant time at which the changes happened.
    */
@@ -103,26 +107,6 @@ public class HoodieCDCFileSplit implements Serializable, 
Comparable<HoodieCDCFil
     this.afterFileSlice = afterFileSlice;
   }
 
-  public String getInstant() {
-    return this.instant;
-  }
-
-  public HoodieCDCInferenceCase getCdcInferCase() {
-    return this.cdcInferCase;
-  }
-
-  public List<String> getCdcFiles() {
-    return this.cdcFiles;
-  }
-
-  public Option<FileSlice> getBeforeFileSlice() {
-    return this.beforeFileSlice;
-  }
-
-  public Option<FileSlice> getAfterFileSlice() {
-    return this.afterFileSlice;
-  }
-
   @Override
   public int compareTo(HoodieCDCFileSplit o) {
     int cmpResult = this.instant.compareTo(o.instant);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCOperation.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCOperation.java
index 90540bc05a69..2cb0f19687e0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCOperation.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCOperation.java
@@ -20,9 +20,15 @@ package org.apache.hudi.common.table.cdc;
 
 import org.apache.hudi.exception.HoodieNotSupportedException;
 
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
 /**
  * Enumeration of change log operation.
  */
+@AllArgsConstructor(access = AccessLevel.PACKAGE)
+@Getter
 public enum HoodieCDCOperation {
   INSERT("i"),
   UPDATE("u"),
@@ -30,14 +36,6 @@ public enum HoodieCDCOperation {
 
   private final String value;
 
-  HoodieCDCOperation(String value) {
-    this.value = value;
-  }
-
-  public String getValue() {
-    return this.value;
-  }
-
   public static HoodieCDCOperation parse(String value) {
     switch (value) {
       case "i":
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/Checkpoint.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/Checkpoint.java
index 67248ba4adcd..eea1c32dd373 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/Checkpoint.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/Checkpoint.java
@@ -19,6 +19,9 @@
 
 package org.apache.hudi.common.table.checkpoint;
 
+import lombok.AccessLevel;
+import lombok.Getter;
+
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
@@ -27,13 +30,16 @@ import java.util.Objects;
 /**
  * Class for representing checkpoint
  */
+@Getter
 public abstract class Checkpoint implements Serializable {
+
   public static final String CHECKPOINT_IGNORE_KEY = 
"deltastreamer.checkpoint.ignore_key";
 
   protected String checkpointKey;
   protected String checkpointResetKey;
   protected String checkpointIgnoreKey;
   // These are extra props to be written to the commit metadata
+  @Getter(AccessLevel.NONE)
   protected Map<String, String> extraProps = new HashMap<>();
 
   public Checkpoint setCheckpointKey(String newKey) {
@@ -41,21 +47,12 @@ public abstract class Checkpoint implements Serializable {
     return this;
   }
 
-  public String getCheckpointKey() {
-    return checkpointKey;
-  }
-
-  public String getCheckpointResetKey() {
-    return checkpointResetKey;
-  }
-
-  public String getCheckpointIgnoreKey() {
-    return checkpointIgnoreKey;
-  }
-
   public abstract Map<String, String> getCheckpointCommitMetadata(String 
overrideResetKey,
                                                                   String 
overrideIgnoreKey);
 
+  // Not using Lombok @EqualsAndHashCode/@ToString here: this class is 
subclassed, and we rely on
+  // the runtime subtype - exact-class matching in equals() and 
getClass().getSimpleName() in toString().
+  // Lombok would bake in the declaring class (Checkpoint) and switch equals() 
to instanceof.
   @Override
   public int hashCode() {
     return Objects.hashCode(checkpointKey);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
index b1eb9e0e2c98..49c17eda75c8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
@@ -48,8 +48,10 @@ import 
org.apache.hudi.internal.schema.convert.InternalSchemaConverter;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -87,10 +89,9 @@ import static 
org.apache.hudi.common.util.ValidationUtils.checkState;
  * <p>
  * This results in two I/O passes over the log file.
  */
+@Slf4j
 public abstract class AbstractHoodieLogRecordScanner {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractHoodieLogRecordScanner.class);
-
   // Reader schema for the records
   protected final HoodieSchema readerSchema;
   // Latest valid instant time
@@ -98,14 +99,17 @@ public abstract class AbstractHoodieLogRecordScanner {
   private final String latestInstantTime;
   protected final HoodieTableMetaClient hoodieTableMetaClient;
   // Merge strategy to use when combining records from log
+  @Getter(AccessLevel.PROTECTED)
   private final String payloadClassFQN;
   // Record's key/partition-path fields
   private final String recordKeyField;
   private final Option<String> partitionPathFieldOpt;
   // Partition name override
+  @Getter
   private final Option<String> partitionNameOverrideOpt;
   // Stateless component for merging records
   protected final HoodieRecordMerger recordMerger;
+  @Getter(AccessLevel.PROTECTED)
   private final TypedProperties payloadProps;
   // Log File Paths
   protected final List<String> logFilePaths;
@@ -117,6 +121,7 @@ public abstract class AbstractHoodieLogRecordScanner {
   // optional instant range for incremental block filtering
   private final Option<InstantRange> instantRange;
   // Read the operation metadata field from the avro record
+  @Getter
   private final boolean withOperationField;
   private final HoodieStorage storage;
   // Total log files read - for metrics
@@ -132,16 +137,19 @@ public abstract class AbstractHoodieLogRecordScanner {
   // Total number of corrupt blocks written across all log files
   private AtomicLong totalCorruptBlocks = new AtomicLong(0);
   // Store the last instant log blocks (needed to implement rollback)
+  @Getter
   private Deque<HoodieLogBlock> currentInstantLogBlocks = new ArrayDeque<>();
   // Enables full scan of log records
   protected final boolean forceFullScan;
   // Progress
+  @Getter
   private float progress = 0.0f;
   // Populate meta fields for the records
   private final boolean populateMetaFields;
   // Record type read from log block
   protected final HoodieRecordType recordType;
   // Collect all the block instants after scanning all the log files.
+  @Getter
   private final List<String> validBlockInstants = new ArrayList<>();
   // table version for compatibility
   private final HoodieTableVersion tableVersion;
@@ -301,7 +309,7 @@ public abstract class AbstractHoodieLogRecordScanner {
        */
       while (logFormatReaderWrapper.hasNext()) {
         HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
-        LOG.info("Scanning log file {}", logFile);
+        log.info("Scanning log file {}", logFile);
         scannedLogFiles.add(logFile);
         totalLogFiles.set(scannedLogFiles.size());
         // Use the HoodieLogFileReader to iterate through the blocks in the 
log file
@@ -310,7 +318,7 @@ public abstract class AbstractHoodieLogRecordScanner {
         totalLogBlocks.incrementAndGet();
         // Ignore the corrupt blocks. No further handling is required for them.
         if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) {
-          LOG.info("Found a corrupt block in {}", logFile.getPath());
+          log.info("Found a corrupt block in {}", logFile.getPath());
           totalCorruptBlocks.incrementAndGet();
           continue;
         }
@@ -347,7 +355,7 @@ public abstract class AbstractHoodieLogRecordScanner {
             instantToBlocksMap.put(instantTime, logBlocksList);
             break;
           case COMMAND_BLOCK:
-            LOG.info("Reading a command block from file {}", 
logFile.getPath());
+            log.info("Reading a command block from file {}", 
logFile.getPath());
             // This is a command block - take appropriate action based on the 
command
             HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
 
@@ -367,8 +375,8 @@ public abstract class AbstractHoodieLogRecordScanner {
         }
       }
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Ordered instant times seen {}", orderedInstantsList);
+      if (log.isDebugEnabled()) {
+        log.debug("Ordered instant times seen {}", orderedInstantsList);
       }
 
       int numBlocksRolledBack = 0;
@@ -424,24 +432,24 @@ public abstract class AbstractHoodieLogRecordScanner {
           validBlockInstants.add(compactedFinalInstantTime);
         }
       }
-      LOG.info("Number of applied rollback blocks {}", numBlocksRolledBack);
+      log.info("Number of applied rollback blocks {}", numBlocksRolledBack);
 
-      if (LOG.isDebugEnabled()) {
-        LOG.info("Final view of the Block time to compactionBlockMap {}", 
blockTimeToCompactionBlockTimeMap);
+      if (log.isDebugEnabled()) {
+        log.info("Final view of the Block time to compactionBlockMap {}", 
blockTimeToCompactionBlockTimeMap);
       }
 
       // merge the last read block when all the blocks are done reading
       if (!currentInstantLogBlocks.isEmpty() && !skipProcessingBlocks) {
-        LOG.info("Merging the final data blocks");
+        log.info("Merging the final data blocks");
         processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
       }
       // Done
       progress = 1.0f;
     } catch (IOException e) {
-      LOG.error("Got IOException when reading log file", e);
+      log.error("Got IOException when reading log file", e);
       throw new HoodieIOException("IOException when reading log file ", e);
     } catch (Exception e) {
-      LOG.error("Got exception when reading log file", e);
+      log.error("Got exception when reading log file", e);
       throw new HoodieException("Exception when reading log file ", e);
     } finally {
       try {
@@ -450,7 +458,7 @@ public abstract class AbstractHoodieLogRecordScanner {
         }
       } catch (IOException ioe) {
         // Eat exception as we do not want to mask the original exception that 
can happen
-        LOG.error("Unable to close log format reader", ioe);
+        log.error("Unable to close log format reader", ioe);
       }
     }
   }
@@ -506,7 +514,7 @@ public abstract class AbstractHoodieLogRecordScanner {
   private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> logBlocks, 
int numLogFilesSeen,
                                              Option<KeySpec> keySpecOpt) 
throws Exception {
     while (!logBlocks.isEmpty()) {
-      LOG.info("Number of remaining logblocks to merge {}", logBlocks.size());
+      log.info("Number of remaining logblocks to merge {}", logBlocks.size());
       // poll the element at the bottom of the stack since that's the order it 
was inserted
       HoodieLogBlock lastBlock = logBlocks.pollLast();
       switch (lastBlock.getBlockType()) {
@@ -519,7 +527,7 @@ public abstract class AbstractHoodieLogRecordScanner {
           Arrays.stream(((HoodieDeleteBlock) 
lastBlock).getRecordsToDelete()).forEach(this::processNextDeletedRecord);
           break;
         case CORRUPT_BLOCK:
-          LOG.warn("Found a corrupt block which was not rolled back");
+          log.warn("Found a corrupt block which was not rolled back");
           break;
         default:
           break;
@@ -535,13 +543,6 @@ public abstract class AbstractHoodieLogRecordScanner {
     return !forceFullScan;
   }
 
-  /**
-   * Return progress of scanning as a float between 0.0 to 1.0.
-   */
-  public float getProgress() {
-    return progress;
-  }
-
   public long getTotalLogFiles() {
     return totalLogFiles.get();
   }
@@ -554,14 +555,6 @@ public abstract class AbstractHoodieLogRecordScanner {
     return totalLogBlocks.get();
   }
 
-  protected String getPayloadClassFQN() {
-    return payloadClassFQN;
-  }
-
-  public Option<String> getPartitionNameOverride() {
-    return partitionNameOverrideOpt;
-  }
-
   public long getTotalRollbacks() {
     return totalRollbacks.get();
   }
@@ -570,14 +563,6 @@ public abstract class AbstractHoodieLogRecordScanner {
     return totalCorruptBlocks.get();
   }
 
-  public boolean isWithOperationField() {
-    return withOperationField;
-  }
-
-  protected TypedProperties getPayloadProps() {
-    return payloadProps;
-  }
-
   /**
    * Key specification with a list of column names.
    */
@@ -595,16 +580,11 @@ public abstract class AbstractHoodieLogRecordScanner {
     }
   }
 
+  @AllArgsConstructor
+  @Getter
   private static class FullKeySpec implements KeySpec {
-    private final List<String> keys;
-    private FullKeySpec(List<String> keys) {
-      this.keys = keys;
-    }
 
-    @Override
-    public List<String> getKeys() {
-      return keys;
-    }
+    private final List<String> keys;
 
     @Override
     public boolean isFullKey() {
@@ -612,12 +592,10 @@ public abstract class AbstractHoodieLogRecordScanner {
     }
   }
 
+  @AllArgsConstructor
   private static class PrefixKeySpec implements KeySpec {
-    private final List<String> keysPrefixes;
 
-    private PrefixKeySpec(List<String> keysPrefixes) {
-      this.keysPrefixes = keysPrefixes;
-    }
+    private final List<String> keysPrefixes;
 
     @Override
     public List<String> getKeys() {
@@ -630,14 +608,6 @@ public abstract class AbstractHoodieLogRecordScanner {
     }
   }
 
-  public Deque<HoodieLogBlock> getCurrentInstantLogBlocks() {
-    return currentInstantLogBlocks;
-  }
-
-  public List<String> getValidBlockInstants() {
-    return validBlockInstants;
-  }
-
   private Pair<ClosableIterator<HoodieRecord>, HoodieSchema> 
getRecordsIterator(
       HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws 
IOException {
     ClosableIterator<HoodieRecord> blockRecordsIterator;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index 332a9c920c6e..159662f73bb9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -43,8 +43,9 @@ import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -74,9 +75,9 @@ import static 
org.apache.hudi.common.util.ValidationUtils.checkState;
  *
  * @param <T> type of engine-specific record representation.
  */
+@Slf4j
 public abstract class BaseHoodieLogRecordReader<T> {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(BaseHoodieLogRecordReader.class);
   public static final String LOG_BLOCK_FULL_READ_DURATION_IN_MILLIS = 
"logBlockFullReadDurationInMillis";
   public static final String BLOCK_SIZE_IN_BYTES = "blockSizeInBytes";
   public static final String TOTAL_RECORDS_PRESENT_IN_LOG_BLOCK = 
"totalRecordsPresentInLogBlock";
@@ -89,13 +90,16 @@ public abstract class BaseHoodieLogRecordReader<T> {
   protected final HoodieReaderContext<T> readerContext;
   protected final HoodieTableMetaClient hoodieTableMetaClient;
   // Merge strategy to use when combining records from log
+  @Getter(AccessLevel.PROTECTED)
   private final String payloadClassFQN;
   // Record's key/partition-path fields
   private final String recordKeyField;
   // Partition name override
+  @Getter
   private final Option<String> partitionNameOverrideOpt;
   // Ordering fields
   protected final String orderingFields;
+  @Getter(AccessLevel.PROTECTED)
   private final TypedProperties payloadProps;
   // Log File Paths
   protected final List<HoodieLogFile> logFiles;
@@ -107,6 +111,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
   // optional instant range for incremental block filtering
   private final Option<InstantRange> instantRange;
   // Read the operation metadata field from the avro record
+  @Getter
   private final boolean withOperationField;
   // FileSystem
   private final HoodieStorage storage;
@@ -129,15 +134,18 @@ public abstract class BaseHoodieLogRecordReader<T> {
   // Scan duration in milliseconds
   private AtomicLong blocksScanDuration = new AtomicLong(0);
   // Store the last instant log blocks (needed to implement rollback)
+  @Getter
   private Deque<HoodieLogBlock> currentInstantLogBlocks = new ArrayDeque<>();
   // Enables full scan of log records
   protected final boolean forceFullScan;
   // Progress
+  @Getter
   private float progress = 0.0f;
-  // Record type read from log block
   // Collect all the block instants after scanning all the log files.
+  @Getter
   private final List<String> validBlockInstants = new ArrayList<>();
   // Block-level scan stats for processed data blocks.
+  @Getter
   private List<Map<String, Object>> blocksStats = new ArrayList<>();
   protected HoodieFileGroupRecordBuffer<T> recordBuffer;
   // Allows to consider inflight instants while merging log records
@@ -271,7 +279,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
        */
       while (logFormatReaderWrapper.hasNext()) {
         HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
-        LOG.debug("Scanning log file {}", logFile);
+        log.debug("Scanning log file {}", logFile);
         scannedLogFiles.add(logFile);
         totalLogFiles.set(scannedLogFiles.size());
         // Use the HoodieLogFileReader to iterate through the blocks in the 
log file
@@ -279,11 +287,11 @@ public abstract class BaseHoodieLogRecordReader<T> {
         logBlock.getBlockContentLocation()
             .map(contentLocation -> 
totalLogBlocksSize.addAndGet(contentLocation.getBlockSize()));
         final String instantTime = 
logBlock.getLogBlockHeader().get(INSTANT_TIME);
-        LOG.debug("Scanning log block with instant time {}", instantTime);
+        log.debug("Scanning log block with instant time {}", instantTime);
         totalLogBlocks.incrementAndGet();
         // Ignore the corrupt blocks. No further handling is required for them.
         if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) {
-          LOG.debug("Found a corrupt block in {}", logFile.getPath());
+          log.debug("Found a corrupt block in {}", logFile.getPath());
           totalCorruptBlocks.incrementAndGet();
           continue;
         }
@@ -314,7 +322,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
           case AVRO_DATA_BLOCK:
           case PARQUET_DATA_BLOCK:
           case DELETE_BLOCK:
-            LOG.debug("Reading a {} block with instant time {}",
+            log.debug("Reading a {} block with instant time {}",
                 logBlock.getBlockType() == 
HoodieLogBlock.HoodieLogBlockType.DELETE_BLOCK ? "delete" : "data",
                 instantTime);
             List<HoodieLogBlock> logBlocksList = 
instantToBlocksMap.getOrDefault(instantTime, new ArrayList<>());
@@ -326,7 +334,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
             instantToBlocksMap.put(instantTime, logBlocksList);
             break;
           case COMMAND_BLOCK:
-            LOG.debug("Reading a command block from file {}", 
logFile.getPath());
+            log.debug("Reading a command block from file {}", 
logFile.getPath());
             // This is a command block - take appropriate action based on the 
command
             HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
 
@@ -341,10 +349,10 @@ public abstract class BaseHoodieLogRecordReader<T> {
               if (rolledBackBlocks != null) {
                 numBlocksRolledBack += rolledBackBlocks.size();
               }
-              LOG.debug("Reading a rollback block with instant {} and target 
instant {}",
+              log.debug("Reading a rollback block with instant {} and target 
instant {}",
                   instantTime, targetInstantForCommandBlock);
             } else {
-              LOG.error("Reading a command block with instant {} whose 
operation is not supported", instantTime);
+              log.error("Reading a command block with instant {} whose 
operation is not supported", instantTime);
               throw new UnsupportedOperationException("Command type not yet 
supported.");
             }
             break;
@@ -353,8 +361,8 @@ public abstract class BaseHoodieLogRecordReader<T> {
         }
       }
 
-      LOG.info("Ordered instant times seen {}", orderedInstantsList);
-      LOG.info("Targeted instants that are rolled back are {}", 
targetRollbackInstants);
+      log.info("Ordered instant times seen {}", orderedInstantsList);
+      log.info("Targeted instants that are rolled back are {}", 
targetRollbackInstants);
 
       // All the block's instants time that are added to the queue are 
collected in this set.
       Set<String> instantTimesIncluded = new HashSet<>();
@@ -377,7 +385,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
 
         // For compacted blocks COMPACTED_BLOCK_TIMES entry is present under 
its headers.
         if (firstBlock.getLogBlockHeader().containsKey(COMPACTED_BLOCK_TIMES)) 
{
-          LOG.debug("For instant time {}, compacted block instants are {}",
+          log.debug("For instant time {}, compacted block instants are {}",
               instantTime, 
firstBlock.getLogBlockHeader().get(COMPACTED_BLOCK_TIMES));
           // When compacted blocks are seen update the 
blockTimeToCompactionBlockTimeMap.
           
Arrays.stream(firstBlock.getLogBlockHeader().get(COMPACTED_BLOCK_TIMES).split(","))
@@ -410,20 +418,20 @@ public abstract class BaseHoodieLogRecordReader<T> {
         }
       }
       Collections.reverse(validBlockInstants);
-      LOG.debug("Number of applied rollback blocks {}", numBlocksRolledBack);
-      LOG.info("Total valid instants found are {}. Instants are {}", 
validBlockInstants.size(), validBlockInstants);
+      log.debug("Number of applied rollback blocks {}", numBlocksRolledBack);
+      log.info("Total valid instants found are {}. Instants are {}", 
validBlockInstants.size(), validBlockInstants);
       if (ignoredBlockCount > 0) {
-        LOG.info("Ignored {} log blocks from {} instants not in the range: 
{}", ignoredBlockCount, ignoredInstants.size(), ignoredInstants);
+        log.info("Ignored {} log blocks from {} instants not in the range: 
{}", ignoredBlockCount, ignoredInstants.size(), ignoredInstants);
       }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Final view of the Block time to compactionBlockMap {}", 
blockTimeToCompactionBlockTimeMap);
+      if (log.isDebugEnabled()) {
+        log.debug("Final view of the Block time to compactionBlockMap {}", 
blockTimeToCompactionBlockTimeMap);
       }
       totalValidLogBlocks.set(currentInstantLogBlocks.size());
       blocksScanDuration.set(scanTimer.endTimer());
 
       // merge the last read block when all the blocks are done reading
       if (!currentInstantLogBlocks.isEmpty() && !skipProcessingBlocks) {
-        LOG.debug("Merging the final data blocks");
+        log.debug("Merging the final data blocks");
         processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
       }
       // Done
@@ -432,10 +440,10 @@ public abstract class BaseHoodieLogRecordReader<T> {
         totalLogRecords.set(recordBuffer.getTotalLogRecords());
       }
     } catch (IOException e) {
-      LOG.error("Got IOException when reading log file", e);
+      log.error("Got IOException when reading log file", e);
       throw new HoodieIOException("IOException when reading log file ", e);
     } catch (Exception e) {
-      LOG.error("Got exception when reading log file", e);
+      log.error("Got exception when reading log file", e);
       throw new HoodieException("Exception when reading log file ", e);
     } finally {
       try {
@@ -444,18 +452,18 @@ public abstract class BaseHoodieLogRecordReader<T> {
         }
       } catch (IOException ioe) {
         // Eat exception as we do not want to mask the original exception that 
can happen
-        LOG.error("Unable to close log format reader", ioe);
+        log.error("Unable to close log format reader", ioe);
       }
       if (!logFiles.isEmpty()) {
         try {
           StoragePath path = logFiles.get(0).getPath();
-          LOG.info("Finished scanning log files. FileId: {}, 
LogFileInstantTime: {}, "
+          log.info("Finished scanning log files. FileId: {}, 
LogFileInstantTime: {}, "
                   + "Total log files: {}, Total log blocks: {}, Total 
rollbacks: {}, Total corrupt blocks: {}",
               FSUtils.getFileIdFromLogPath(path), 
FSUtils.getDeltaCommitTimeFromLogPath(path),
               totalLogFiles.get(), totalLogBlocks.get(), totalRollbacks.get(), 
totalCorruptBlocks.get());
         } catch (Exception e) {
-          LOG.warn("Could not extract fileId from log path", e);
-          LOG.info("Finished scanning log files. "
+          log.warn("Could not extract fileId from log path", e);
+          log.info("Finished scanning log files. "
                   + "Total log files: {}, Total log blocks: {}, Total 
rollbacks: {}, Total corrupt blocks: {}",
               totalLogFiles.get(), totalLogBlocks.get(), totalRollbacks.get(), 
totalCorruptBlocks.get());
         }
@@ -469,7 +477,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
   private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> logBlocks, 
int numLogFilesSeen,
                                              Option<KeySpec> keySpecOpt) 
throws Exception {
     while (!logBlocks.isEmpty()) {
-      LOG.debug("Number of remaining logblocks to merge {}", logBlocks.size());
+      log.debug("Number of remaining logblocks to merge {}", logBlocks.size());
       // poll the element at the bottom of the stack since that's the order it 
was inserted
       HoodieLogBlock lastBlock = logBlocks.pollLast();
       switch (lastBlock.getBlockType()) {
@@ -482,7 +490,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
           recordBuffer.processDeleteBlock((HoodieDeleteBlock) lastBlock);
           break;
         case CORRUPT_BLOCK:
-          LOG.warn("Found a corrupt block which was not rolled back");
+          log.warn("Found a corrupt block which was not rolled back");
           break;
         default:
           break;
@@ -494,7 +502,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
 
   private void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> 
keySpecOpt) throws IOException {
     String blockInstantTime = dataBlock.getLogBlockHeader().get(INSTANT_TIME);
-    LOG.debug("Processing log block with instant time {}", blockInstantTime);
+    log.debug("Processing log block with instant time {}", blockInstantTime);
     long totalLogRecordsBefore = recordBuffer != null ? 
recordBuffer.getTotalLogRecords() : 0L;
     HoodieTimer blockReadTimer = HoodieTimer.start();
     recordBuffer.processDataBlock(dataBlock, keySpecOpt);
@@ -507,7 +515,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
         .map(contentLocation -> blockReadMetrics.put(BLOCK_SIZE_IN_BYTES, 
contentLocation.getBlockSize()));
     
blockReadMetrics.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME.toString(), 
blockInstantTime);
     blocksStats.add(blockReadMetrics);
-    LOG.debug("For log block, scan metrics are {}", blockReadMetrics);
+    log.debug("For log block, scan metrics are {}", blockReadMetrics);
   }
 
   private boolean shouldLookupRecords() {
@@ -516,13 +524,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
     return !forceFullScan;
   }
 
-  /**
-   * Return progress of scanning as a float between 0.0 to 1.0.
-   */
-  public float getProgress() {
-    return progress;
-  }
-
   public long getTotalLogFiles() {
     return totalLogFiles.get();
   }
@@ -543,22 +544,10 @@ public abstract class BaseHoodieLogRecordReader<T> {
     return totalValidLogBlocks.get();
   }
 
-  public List<Map<String, Object>> getBlocksStats() {
-    return blocksStats;
-  }
-
   public long getBlocksScanDuration() {
     return blocksScanDuration.get();
   }
 
-  protected String getPayloadClassFQN() {
-    return payloadClassFQN;
-  }
-
-  public Option<String> getPartitionNameOverride() {
-    return partitionNameOverrideOpt;
-  }
-
   public long getTotalRollbacks() {
     return totalRollbacks.get();
   }
@@ -567,22 +556,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
     return totalCorruptBlocks.get();
   }
 
-  public boolean isWithOperationField() {
-    return withOperationField;
-  }
-
-  protected TypedProperties getPayloadProps() {
-    return payloadProps;
-  }
-
-  public Deque<HoodieLogBlock> getCurrentInstantLogBlocks() {
-    return currentInstantLogBlocks;
-  }
-
-  public List<String> getValidBlockInstants() {
-    return validBlockInstants;
-  }
-
   /**
    * Builder used to build {@code AbstractHoodieLogRecordScanner}.
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/FullKeySpec.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/FullKeySpec.java
index ede7918649b4..a186abd73e9a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/FullKeySpec.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/FullKeySpec.java
@@ -19,6 +19,9 @@
 
 package org.apache.hudi.common.table.log;
 
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
 import java.util.List;
 
 /**
@@ -26,17 +29,11 @@ import java.util.List;
  * That is, the comparison between a record key and an element
  * of the set is {@link String#equals}.
  */
+@AllArgsConstructor
+@Getter
 public class FullKeySpec implements KeySpec {
-  private final List<String> keys;
-
-  public FullKeySpec(List<String> keys) {
-    this.keys = keys;
-  }
 
-  @Override
-  public List<String> getKeys() {
-    return keys;
-  }
+  private final List<String> keys;
 
   @Override
   public boolean isFullKey() {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index 2c1d91e7b7c1..03ae4a2c4c07 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -44,8 +44,8 @@ import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StorageSchemes;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 
 import javax.annotation.Nullable;
 
@@ -63,14 +63,15 @@ import static 
org.apache.hudi.common.util.ValidationUtils.checkState;
  * Scans a log file and provides block level iterator on the log file Loads 
the entire block contents in memory Can emit
  * either a DataBlock, CommandBlock, DeleteBlock or CorruptBlock (if one is 
found).
  */
+@Slf4j
 public class HoodieLogFileReader implements HoodieLogFormat.Reader {
 
   public static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024; // 16 MB
-  private static final int BLOCK_SCAN_READ_BUFFER_SIZE = 1024 * 1024; // 1 MB
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieLogFileReader.class);
+  private static final int BLOCK_SCAN_READ_BUFFER_SIZE = 1024 * 1024;
   private static final String REVERSE_LOG_READER_HAS_NOT_BEEN_ENABLED = 
"Reverse log reader has not been enabled";
 
   private final HoodieStorage storage;
+  @Getter
   private final HoodieLogFile logFile;
   private final int bufferSize;
   private final byte[] magicBuffer = new byte[6];
@@ -118,11 +119,6 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
     }
   }
 
-  @Override
-  public HoodieLogFile getLogFile() {
-    return logFile;
-  }
-
   // TODO : convert content and block length to long by using ByteBuffer, raw 
byte [] allows
   // for max of Integer size
   private HoodieLogBlock readBlock() throws IOException {
@@ -244,12 +240,12 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
   }
 
   private HoodieLogBlock createCorruptBlock(long blockStartPos) throws 
IOException {
-    LOG.info("Log {} has a corrupted block at {}", logFile, blockStartPos);
+    log.info("Log {} has a corrupted block at {}", logFile, blockStartPos);
     inputStream.seek(blockStartPos);
     long nextBlockOffset = scanForNextAvailableBlockOffset();
     // Rewind to the initial start and read corrupted bytes till the 
nextBlockOffset
     inputStream.seek(blockStartPos);
-    LOG.info("Next available block in {} starts at {}", logFile, 
nextBlockOffset);
+    log.info("Next available block in {} starts at {}", logFile, 
nextBlockOffset);
     int corruptedBlockSize = (int) (nextBlockOffset - blockStartPos);
     long contentPosition = inputStream.getPos();
     Option<byte[]> corruptedBytes = HoodieLogBlock.tryReadContent(inputStream, 
corruptedBlockSize, true);
@@ -276,7 +272,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
       // So we have to shorten the footer block size by the size of magic hash
       blockSizeFromFooter = inputStream.readLong() - magicBuffer.length;
     } catch (EOFException e) {
-      LOG.info("Found corrupted block in file {} with block size({}) running 
past EOF", logFile, blocksize);
+      log.info("Found corrupted block in file {} with block size({}) running 
past EOF", logFile, blocksize);
       // this is corrupt
       // This seek is required because contract of seek() is different for 
naked DFSInputStream vs BufferedFSInputStream
       // release-3.1.0-RC1/DFSInputStream.java#L1455
@@ -286,7 +282,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
     }
 
     if (blocksize != blockSizeFromFooter) {
-      LOG.info("Found corrupted block in file {}. Header block size({}) did 
not match the footer block size({})", logFile, blocksize, blockSizeFromFooter);
+      log.info("Found corrupted block in file {}. Header block size({}) did 
not match the footer block size({})", logFile, blocksize, blockSizeFromFooter);
       inputStream.seek(currentPos);
       return true;
     }
@@ -297,7 +293,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
       return false;
     } catch (CorruptedLogFileException e) {
       // This is a corrupted block
-      LOG.info("Found corrupted block in file {}. No magic hash found right 
after footer block size entry", logFile);
+      log.info("Found corrupted block in file {}. No magic hash found right 
after footer block size entry", logFile);
       return true;
     } finally {
       inputStream.seek(currentPos);
@@ -330,7 +326,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
   @Override
   public void close() throws IOException {
     if (!closed) {
-      LOG.info("Closing Log file reader {}",  logFile.getFileName());
+      log.info("Closing Log file reader {}",  logFile.getFileName());
       if (null != this.inputStream) {
         this.inputStream.close();
       }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
index a675e0d9da89..3877a682240d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
@@ -29,6 +29,8 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -246,18 +248,12 @@ public interface HoodieLogFormat {
    * A set of feature flags associated with a log format. Versions are changed 
when the log format changes. TODO(na) -
    * Implement policies around major/minor versions
    */
+  @AllArgsConstructor(access = AccessLevel.PACKAGE)
+  @Getter
   abstract class LogFormatVersion {
 
     private final int version;
 
-    LogFormatVersion(int version) {
-      this.version = version;
-    }
-
-    public int getVersion() {
-      return version;
-    }
-
     public abstract boolean hasMagicHeader();
 
     public abstract boolean hasContent();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
index 77c3e78fcc32..1c342fecf39c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
@@ -25,8 +25,7 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.storage.HoodieStorage;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.List;
@@ -34,6 +33,7 @@ import java.util.List;
 /**
  * Hoodie log format reader.
  */
+@Slf4j
 public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
 
   private final List<HoodieLogFile> logFiles;
@@ -45,8 +45,6 @@ public class HoodieLogFormatReader implements 
HoodieLogFormat.Reader {
   private final boolean enableInlineReading;
   private final int bufferSize;
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieLogFormatReader.class);
-
   HoodieLogFormatReader(HoodieStorage storage, List<HoodieLogFile> logFiles, 
HoodieSchema readerSchema,
                         boolean reverseLogReader, int bufferSize, boolean 
enableRecordLookups,
                         String recordKeyField, InternalSchema internalSchema) 
throws IOException {
@@ -91,7 +89,7 @@ public class HoodieLogFormatReader implements 
HoodieLogFormat.Reader {
       } catch (IOException io) {
         throw new HoodieIOException("unable to initialize read with log file 
", io);
       }
-      LOG.debug("Moving to the next reader for logfile {}", 
currentReader.getLogFile());
+      log.debug("Moving to the next reader for logfile {}", 
currentReader.getLogFile());
       return hasNext();
     }
     return false;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index b3f1eeaa988e..1ee8e7560d05 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -34,8 +34,8 @@ import org.apache.hudi.expression.Predicates;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.Closeable;
 import java.io.Serializable;
@@ -52,9 +52,11 @@ import static 
org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
  *
  * @param <T> type of engine-specific record representation.
  */
+@Getter
+@Slf4j
 public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
     implements Iterable<BufferedRecord<T>>, Closeable {
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMergedLogRecordReader.class);
+
   // A timer for calculating elapsed time in millis
   public final HoodieTimer timer = HoodieTimer.create();
   // count of merged records in log
@@ -102,8 +104,8 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
     this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer();
     this.numMergedRecordsInLog = recordBuffer.size();
 
-    LOG.info("Number of log files scanned => {}", logFiles.size());
-    LOG.info("Number of entries in Map => {}", recordBuffer.size());
+    log.info("Number of log files scanned => {}", logFiles.size());
+    log.info("Number of entries in Map => {}", recordBuffer.size());
   }
 
   static Option<KeySpec> createKeySpec(Option<Predicate> filter) {
@@ -134,10 +136,6 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
     return recordBuffer.getLogRecords();
   }
 
-  public long getNumMergedRecordsInLog() {
-    return numMergedRecordsInLog;
-  }
-
   /**
    * Returns the builder for {@code HoodieMergedLogRecordReader}.
    */
@@ -145,10 +143,6 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
     return new Builder<>();
   }
 
-  public long getTotalTimeTakenToReadAndMergeBlocks() {
-    return totalTimeTakenToReadAndMergeBlocks;
-  }
-
   @Override
   public void close() {
     // No op.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 318533859746..f7521fc207c5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -47,8 +47,8 @@ import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
@@ -81,10 +81,10 @@ import static 
org.apache.hudi.common.util.ValidationUtils.checkArgument;
  * This results in two I/O passes over the log file.
  */
 @NotThreadSafe
+@Slf4j
 public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
     implements Iterable<HoodieRecord>, Closeable {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMergedLogRecordScanner.class);
   // A timer for calculating elapsed time in millis
   public final HoodieTimer timer = HoodieTimer.create();
   // Map of compacted/merged records
@@ -92,9 +92,11 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
   // Set of already scanned prefixes allowing us to avoid scanning same 
prefixes again
   private final Set<String> scannedPrefixes;
   // count of merged records in log
+  @Getter
   private long numMergedRecordsInLog;
   private final long maxMemorySizeInBytes;
   // Stores the total time taken to perform reading and merging of log blocks
+  @Getter
   private long totalTimeTakenToReadAndMergeBlocks;
   private final String[] orderingFields;
   private final DeleteContext deleteContext;
@@ -220,8 +222,8 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
     this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer();
     this.numMergedRecordsInLog = records.size();
 
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Scanned {} log files with stats: MaxMemoryInBytes => {}, 
MemoryBasedMap => {} entries, {} total bytes, DiskBasedMap => {} entries, {} 
total bytes",
+    if (log.isInfoEnabled()) {
+      log.info("Scanned {} log files with stats: MaxMemoryInBytes => {}, 
MemoryBasedMap => {} entries, {} total bytes, DiskBasedMap => {} entries, {} 
total bytes",
           logFilePaths.size(), maxMemorySizeInBytes, 
records.getInMemoryMapNumEntries(), records.getCurrentInMemoryMapSize(),
           records.getDiskBasedMapNumEntries(), 
records.getSizeOfFileOnDiskInBytes());
     }
@@ -240,10 +242,6 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
     return recordMerger.getRecordType();
   }
 
-  public long getNumMergedRecordsInLog() {
-    return numMergedRecordsInLog;
-  }
-
   /**
    * Returns the builder for {@code HoodieMergedLogRecordScanner}.
    */
@@ -313,10 +311,6 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
     }
   }
 
-  public long getTotalTimeTakenToReadAndMergeBlocks() {
-    return totalTimeTakenToReadAndMergeBlocks;
-  }
-
   @Override
   public void close() {
     if (records != null) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java
index 9dd56cc66182..eda285d478d8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java
@@ -21,6 +21,10 @@ package org.apache.hudi.common.table.log;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
@@ -37,6 +41,7 @@ import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTim
 /**
  * An instant range used for incremental reader filtering.
  */
+@Getter
 public abstract class InstantRange implements Serializable {
   private static final long serialVersionUID = 1L;
 
@@ -55,14 +60,6 @@ public abstract class InstantRange implements Serializable {
     return new Builder();
   }
 
-  public Option<String> getStartInstant() {
-    return startInstantOpt;
-  }
-
-  public Option<String> getEndInstant() {
-    return endInstantOpt;
-  }
-
   public abstract boolean isInRange(String instant);
 
   public abstract RangeType getRangeType();
@@ -248,6 +245,7 @@ public abstract class InstantRange implements Serializable {
   /**
    * Builder for {@link InstantRange}.
    */
+  @NoArgsConstructor(access = AccessLevel.PRIVATE)
   public static class Builder {
     private String startInstant;
     private String endInstant;
@@ -256,9 +254,6 @@ public abstract class InstantRange implements Serializable {
     private Set<String> explicitInstants;
     private List<InstantRange> instantRanges;
 
-    private Builder() {
-    }
-
     public Builder startInstant(String startInstant) {
       this.startInstant = startInstant;
       return this;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java
index 965c57309b65..ea4c25db661a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java
@@ -22,6 +22,8 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.io.SeekableDataInputStream;
 import org.apache.hudi.storage.HoodieStorage;
 
+import lombok.Getter;
+
 import java.io.ByteArrayOutputStream;
 import java.util.HashMap;
 import java.util.Map;
@@ -30,6 +32,7 @@ import java.util.function.Supplier;
 /**
  * Command block issues a specific command to the scanner.
  */
+@Getter
 public class HoodieCommandBlock extends HoodieLogBlock {
 
   private final HoodieCommandBlockTypeEnum type;
@@ -53,10 +56,6 @@ public class HoodieCommandBlock extends HoodieLogBlock {
         
HoodieCommandBlockTypeEnum.values()[Integer.parseInt(header.get(HeaderMetadataType.COMMAND_BLOCK_TYPE))];
   }
 
-  public HoodieCommandBlockTypeEnum getType() {
-    return type;
-  }
-
   @Override
   public HoodieLogBlockType getBlockType() {
     return HoodieLogBlockType.COMMAND_BLOCK;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index 70ac694dedfe..dff48985f646 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -30,8 +30,8 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.SeekableDataInputStream;
 import org.apache.hudi.storage.HoodieStorage;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -57,8 +57,8 @@ import static 
org.apache.hudi.common.util.ValidationUtils.checkState;
  *   2. Total number of records in the block
  *   3. Actual serialized content of the records
  */
+@Slf4j
 public abstract class HoodieDataBlock extends HoodieLogBlock {
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieDataBlock.class);
 
   // TODO rebase records/content to leverage Either to warrant
   //      that they are mutex (used by read/write flows respectively)
@@ -67,6 +67,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
   /**
    * Key field's name w/in the record's schema
    */
+  @Getter
   private final String keyFieldName;
 
   private final boolean enablePointLookups;
@@ -132,10 +133,6 @@ public abstract class HoodieDataBlock extends 
HoodieLogBlock {
     return serializeRecords(records.get(), storage);
   }
 
-  public String getKeyFieldName() {
-    return keyFieldName;
-  }
-
   public boolean containsPartialUpdates() {
     return getLogBlockHeader().containsKey(HeaderMetadataType.IS_PARTIAL)
         && 
Boolean.parseBoolean(getLogBlockHeader().get(HeaderMetadataType.IS_PARTIAL));
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
index 54e7f301e6a8..0d69dab14258 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
@@ -32,6 +32,7 @@ import org.apache.hudi.io.SeekableDataInputStream;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.util.Lazy;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryDecoder;
@@ -41,8 +42,6 @@ import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -62,8 +61,9 @@ import static 
org.apache.hudi.avro.HoodieAvroWrapperUtils.wrapValueIntoAvro;
 /**
  * Delete block contains a list of keys to be deleted from scanning the blocks 
so far.
  */
+@Slf4j
 public class HoodieDeleteBlock extends HoodieLogBlock {
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieDeleteBlock.class);
+
   /**
    * These static builders are added to avoid performance issue in Avro 1.10.
    * You can find more details in HoodieAvroUtils, HUDI-3834, and AVRO-3048.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index bc80e279a194..95267b154ba5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -38,6 +38,7 @@ import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.storage.inline.InLineFSUtils;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.generic.IndexedRecord;
 
 import java.io.ByteArrayOutputStream;
@@ -56,6 +57,7 @@ import static 
org.apache.hudi.common.util.ValidationUtils.checkState;
  * HoodieHFileDataBlock contains a list of records stored inside an HFile 
format. It is used with the HFile
  * base file format.
  */
+@Slf4j
 public class HoodieHFileDataBlock extends HoodieDataBlock {
   private final Map<String, String> writerParams;
   // This path is used for constructing HFile reader context, which should not 
be
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
index 554b83b3455d..0b703ed7ac91 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
@@ -28,9 +28,11 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.SeekableDataInputStream;
 import org.apache.hudi.storage.HoodieStorage;
 
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -55,8 +57,11 @@ import static 
org.apache.hudi.common.util.ValidationUtils.checkState;
 /**
  * Abstract class defining a block in HoodieLogFile.
  */
+@AllArgsConstructor
+@Getter
+@Slf4j
 public abstract class HoodieLogBlock {
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieLogBlock.class);
+
   /**
    * The current version of the log block. Anytime the logBlock format changes 
this version needs to be bumped and
    * corresponding changes need to be made to {@link HoodieLogBlockVersion} 
TODO : Change this to a class, something
@@ -65,32 +70,24 @@ public abstract class HoodieLogBlock {
    */
   public static int version = 3;
   // Header for each log block
+  @Nonnull
   private final Map<HeaderMetadataType, String> logBlockHeader;
   // Footer for each log block
+  @Nonnull
   private final Map<FooterMetadataType, String> logBlockFooter;
   // Location of a log block on disk
+  @Nonnull
   private final Option<HoodieLogBlockContentLocation> blockContentLocation;
   // data for a specific block
+  @Nonnull
   private Option<byte[]> content;
+  @Getter(AccessLevel.PROTECTED)
+  @Nullable
   private final Supplier<SeekableDataInputStream> inputStreamSupplier;
   // Toggle flag, whether to read blocks lazily (I/O intensive) or not (Memory 
intensive)
+  @Getter(AccessLevel.NONE)
   protected boolean readBlockLazily;
 
-  public HoodieLogBlock(
-      @Nonnull Map<HeaderMetadataType, String> logBlockHeader,
-      @Nonnull Map<FooterMetadataType, String> logBlockFooter,
-      @Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation,
-      @Nonnull Option<byte[]> content,
-      @Nullable Supplier<SeekableDataInputStream> inputStreamSupplier,
-      boolean readBlockLazily) {
-    this.logBlockHeader = logBlockHeader;
-    this.logBlockFooter = logBlockFooter;
-    this.blockContentLocation = blockContentLocation;
-    this.content = content;
-    this.inputStreamSupplier = inputStreamSupplier;
-    this.readBlockLazily = readBlockLazily;
-  }
-
   // Return the bytes representation of the data belonging to a LogBlock
   public ByteArrayOutputStream getContentBytes(HoodieStorage storage) throws 
IOException {
     throw new HoodieException("No implementation was provided");
@@ -110,22 +107,6 @@ public abstract class HoodieLogBlock {
     throw new HoodieException("No implementation was provided");
   }
 
-  public Option<HoodieLogBlockContentLocation> getBlockContentLocation() {
-    return this.blockContentLocation;
-  }
-
-  public Map<HeaderMetadataType, String> getLogBlockHeader() {
-    return logBlockHeader;
-  }
-
-  public Map<FooterMetadataType, String> getLogBlockFooter() {
-    return logBlockFooter;
-  }
-
-  public Option<byte[]> getContent() {
-    return content;
-  }
-
   /**
    * Compacted blocks are created using log compaction which basically merges 
the consecutive blocks together and create
    * huge block with all the changes.
@@ -161,10 +142,10 @@ public abstract class HoodieLogBlock {
       try {
         logBlockHeader.put(HeaderMetadataType.RECORD_POSITIONS, 
LogReaderUtils.encodePositions(positionSet));
       } catch (IOException e) {
-        LOG.error("Cannot write record positions to the log block header.", e);
+        log.error("Cannot write record positions to the log block header.", e);
       }
     } else {
-      LOG.warn("There are duplicate keys in the records (number of unique 
positions: {}, "
+      log.warn("There are duplicate keys in the records (number of unique 
positions: {}, "
               + "number of records: {}). Skip writing record positions to the 
log block header.",
           positionSet.size(), numRecords);
     }
@@ -176,7 +157,7 @@ public abstract class HoodieLogBlock {
   }
 
   protected void removeBaseFileInstantTimeOfPositions() {
-    LOG.info("There are records without valid positions. "
+    log.info("There are records without valid positions. "
         + "Skip writing record positions to the block header.");
     
logBlockHeader.remove(HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS);
   }
@@ -252,7 +233,10 @@ public abstract class HoodieLogBlock {
    * This class is used to store the Location of the Content of a Log Block. 
It's used when a client chooses for a IO
    * intensive CompactedScanner, the location helps to lazily read contents 
from the log file
    */
+  @AllArgsConstructor
+  @Getter
   public static final class HoodieLogBlockContentLocation {
+
     // Storage Config required to access the file
     private final HoodieStorage storage;
     // The logFile that contains this block
@@ -263,38 +247,6 @@ public abstract class HoodieLogBlock {
     private final long blockSize;
     // The final position where the complete block ends
     private final long blockEndPos;
-
-    public HoodieLogBlockContentLocation(HoodieStorage storage,
-                                         HoodieLogFile logFile,
-                                         long contentPositionInLogFile,
-                                         long blockSize,
-                                         long blockEndPos) {
-      this.storage = storage;
-      this.logFile = logFile;
-      this.contentPositionInLogFile = contentPositionInLogFile;
-      this.blockSize = blockSize;
-      this.blockEndPos = blockEndPos;
-    }
-
-    public HoodieStorage getStorage() {
-      return storage;
-    }
-
-    public HoodieLogFile getLogFile() {
-      return logFile;
-    }
-
-    public long getContentPositionInLogFile() {
-      return contentPositionInLogFile;
-    }
-
-    public long getBlockSize() {
-      return blockSize;
-    }
-
-    public long getBlockEndPos() {
-      return blockEndPos;
-    }
   }
 
   /**
@@ -359,10 +311,6 @@ public abstract class HoodieLogBlock {
     return Option.of(baos);
   }
 
-  protected Supplier<SeekableDataInputStream> getInputStreamSupplier() {
-    return inputStreamSupplier;
-  }
-
   /**
    * Adds the record positions if the base file instant time of the positions 
exists
    * in the log header and the record positions are all valid.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
index d68e608463df..fb78adaa772c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
@@ -23,6 +23,10 @@ import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.OrderingValues;
 
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
@@ -34,41 +38,22 @@ import java.util.function.UnaryOperator;
  *
  * @param <T> The type of the engine specific row.
  */
+@AllArgsConstructor
+@Getter
 public class BufferedRecord<T> implements Serializable {
+
   private String recordKey;
-  private T record;
   private final Comparable orderingValue;
+  private T record;
   private final Integer schemaId;
-  @Nullable private HoodieOperation hoodieOperation;
+  @Nullable
+  @Setter
+  private HoodieOperation hoodieOperation;
 
   public BufferedRecord() {
     this(null, null, null, null, null);
   }
 
-  public BufferedRecord(String recordKey, Comparable orderingValue, T record, 
Integer schemaId, @Nullable HoodieOperation hoodieOperation) {
-    this.recordKey = recordKey;
-    this.orderingValue = orderingValue;
-    this.record = record;
-    this.schemaId = schemaId;
-    this.hoodieOperation = hoodieOperation;
-  }
-
-  public String getRecordKey() {
-    return recordKey;
-  }
-
-  public Comparable getOrderingValue() {
-    return orderingValue;
-  }
-
-  public T getRecord() {
-    return record;
-  }
-
-  public Integer getSchemaId() {
-    return schemaId;
-  }
-
   public boolean isDelete() {
     return HoodieOperation.isDelete(hoodieOperation) || 
HoodieOperation.isUpdateBefore(hoodieOperation);
   }
@@ -81,14 +66,6 @@ public class BufferedRecord<T> implements Serializable {
     return isDelete() && OrderingValues.isDefault(orderingValue);
   }
 
-  public void setHoodieOperation(HoodieOperation hoodieOperation) {
-    this.hoodieOperation = hoodieOperation;
-  }
-
-  public HoodieOperation getHoodieOperation() {
-    return this.hoodieOperation;
-  }
-
   public BufferedRecord<T> toBinary(RecordContext<T> recordContext) {
     if (record != null) {
       HoodieSchema schema = recordContext.getSchemaFromBufferRecord(this);
@@ -124,6 +101,8 @@ public class BufferedRecord<T> implements Serializable {
     return this;
   }
 
+  // Intentionally not using @EqualsAndHashCode: Lombok generates 
instanceof/canEqual based equality,
+  // while this class requires exact runtime-class equality via getClass()
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
index 81d2de0ac0a2..06e903dd1fa4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
@@ -33,16 +33,17 @@ import org.apache.hudi.common.util.OrderingValues;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
 
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
 import java.io.IOException;
 
 /**
  * Factory to create a {@link BufferedRecordMerger}.
  */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
 public class BufferedRecordMergerFactory {
 
-  private BufferedRecordMergerFactory() {
-  }
-
   public static <T> BufferedRecordMerger<T> create(HoodieReaderContext<T> 
readerContext,
                                                    RecordMergeMode 
recordMergeMode,
                                                    boolean 
enablePartialMerging,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/DeleteContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/DeleteContext.java
index ce6857dc2aeb..f1bd8b0b0a8a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/DeleteContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/DeleteContext.java
@@ -27,6 +27,9 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 
+import lombok.Getter;
+import lombok.experimental.Accessors;
+
 import java.io.Serializable;
 import java.util.Properties;
 
@@ -37,10 +40,13 @@ import static 
org.apache.hudi.common.model.HoodieRecord.HOODIE_IS_DELETED_FIELD;
 /**
  * Schema context for deletes.
  */
+@Getter
 public class DeleteContext implements Serializable {
+
   private static final long serialVersionUID = 1L;
 
   private final Option<Pair<String, String>> customDeleteMarkerKeyValue;
+  @Accessors(fluent = true)
   private final boolean hasBuiltInDeleteField;
   private int hoodieOperationPos;
   private HoodieSchema readerSchema;
@@ -108,25 +114,9 @@ public class DeleteContext implements Serializable {
         .orElseGet(() -> -1);
   }
 
-  public Option<Pair<String, String>> getCustomDeleteMarkerKeyValue() {
-    return customDeleteMarkerKeyValue;
-  }
-
-  public boolean hasBuiltInDeleteField() {
-    return hasBuiltInDeleteField;
-  }
-
-  public int getHoodieOperationPos() {
-    return hoodieOperationPos;
-  }
-
   public DeleteContext withReaderSchema(HoodieSchema readerSchema) {
     this.readerSchema = readerSchema;
     this.hoodieOperationPos = getHoodieOperationPos(readerSchema);
     return this;
   }
-
-  public HoodieSchema getReaderSchema() {
-    return readerSchema;
-  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
index 01f3e513e341..da0bfb567abe 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
@@ -32,8 +32,11 @@ import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 
 import javax.annotation.Nullable;
 
@@ -108,11 +111,13 @@ import java.util.stream.Stream;
  *
  * <p>IMPORTANT: the reader may optionally choose to fall back to reading the 
latest snapshot if there are files decoding the commit metadata are already 
cleaned.
  */
+@Slf4j
 public class IncrementalQueryAnalyzer {
+
   public static final String START_COMMIT_EARLIEST = "earliest";
-  private static final Logger LOG = 
LoggerFactory.getLogger(IncrementalQueryAnalyzer.class);
 
   private final HoodieTableMetaClient metaClient;
+  @Getter
   private final Option<String> startCompletionTime;
   private final Option<String> endCompletionTime;
   private final InstantRange.RangeType rangeType;
@@ -143,10 +148,6 @@ public class IncrementalQueryAnalyzer {
     this.limit = limit;
   }
 
-  public Option<String> getStartCompletionTime() {
-    return startCompletionTime;
-  }
-
   /**
    * Returns a builder.
    */
@@ -206,7 +207,7 @@ public class IncrementalQueryAnalyzer {
       String endInstant = endCompletionTime.isEmpty() ? null : lastInstant;
       return QueryContext.create(startInstant, endInstant, instants, 
archivedInstants, activeInstants, filteredTimeline, archivedReadTimeline);
     } catch (Exception ex) {
-      LOG.error("Got exception when generating incremental query info", ex);
+      log.error("Got exception when generating incremental query info", ex);
       throw new HoodieException(ex);
     }
   }
@@ -284,6 +285,7 @@ public class IncrementalQueryAnalyzer {
   /**
    * Builder for {@link IncrementalQueryAnalyzer}.
    */
+  @NoArgsConstructor
   public static class Builder {
     /**
      * Start completion time.
@@ -304,9 +306,6 @@ public class IncrementalQueryAnalyzer {
      */
     private int limit = -1;
 
-    public Builder() {
-    }
-
     public Builder startCompletionTime(String startCompletionTime) {
       this.startCompletionTime = startCompletionTime;
       return this;
@@ -361,9 +360,12 @@ public class IncrementalQueryAnalyzer {
   /**
    * Represents the analyzed query context.
    */
+  @AllArgsConstructor(access = AccessLevel.PRIVATE)
+  @Getter
   public static class QueryContext {
+
     public static final QueryContext EMPTY =
-        new QueryContext(null, null, Collections.emptyList(), 
Collections.emptyList(), Collections.emptyList(), null, null);
+        new QueryContext(Option.empty(), Option.empty(), 
Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), 
null, null);
 
     /**
      * An empty option indicates consumption from the earliest instant.
@@ -373,6 +375,8 @@ public class IncrementalQueryAnalyzer {
      * An empty option indicates consumption to the latest instant.
      */
     private final Option<String> endInstant;
+    @Getter(AccessLevel.NONE)
+    private final List<String> instants;
     private final List<HoodieInstant> archivedInstants;
     private final List<HoodieInstant> activeInstants;
     /**
@@ -382,25 +386,9 @@ public class IncrementalQueryAnalyzer {
     /**
      * The archived timeline to read filtered by given configurations.
      */
+    @Nullable
+    @Getter(AccessLevel.NONE)
     private final HoodieTimeline archivedTimeline;
-    private final List<String> instants;
-
-    private QueryContext(
-        @Nullable String startInstant,
-        @Nullable String endInstant,
-        List<String> instants,
-        List<HoodieInstant> archivedInstants,
-        List<HoodieInstant> activeInstants,
-        HoodieTimeline activeTimeline,
-        @Nullable HoodieTimeline archivedTimeline) {
-      this.startInstant = Option.ofNullable(startInstant);
-      this.endInstant = Option.ofNullable(endInstant);
-      this.archivedInstants = archivedInstants;
-      this.activeInstants = activeInstants;
-      this.activeTimeline = activeTimeline;
-      this.archivedTimeline = archivedTimeline;
-      this.instants = instants;
-    }
 
     public static QueryContext create(
         @Nullable String startInstant,
@@ -410,7 +398,7 @@ public class IncrementalQueryAnalyzer {
         List<HoodieInstant> activeInstants,
         HoodieTimeline activeTimeline,
         @Nullable HoodieTimeline archivedTimeline) {
-      return new QueryContext(startInstant, endInstant, instants, 
archivedInstants, activeInstants, activeTimeline, archivedTimeline);
+      return new QueryContext(Option.ofNullable(startInstant), 
Option.ofNullable(endInstant), instants, archivedInstants, activeInstants, 
activeTimeline, archivedTimeline);
     }
 
     public boolean isEmpty() {
@@ -421,14 +409,6 @@ public class IncrementalQueryAnalyzer {
       return this.instants;
     }
 
-    public Option<String> getStartInstant() {
-      return startInstant;
-    }
-
-    public Option<String> getEndInstant() {
-      return endInstant;
-    }
-
     /**
      * Returns the latest instant time which should be included physically in 
reading.
      */
@@ -441,14 +421,6 @@ public class IncrementalQueryAnalyzer {
       return Stream.concat(archivedInstants.stream(), 
activeInstants.stream()).collect(Collectors.toList());
     }
 
-    public List<HoodieInstant> getArchivedInstants() {
-      return archivedInstants;
-    }
-
-    public List<HoodieInstant> getActiveInstants() {
-      return activeInstants;
-    }
-
     public boolean isConsumingFromEarliest() {
       return startInstant.isEmpty();
     }
@@ -489,10 +461,6 @@ public class IncrementalQueryAnalyzer {
       }
     }
 
-    public HoodieTimeline getActiveTimeline() {
-      return this.activeTimeline;
-    }
-
     public @Nullable HoodieTimeline getArchivedTimeline() {
       return archivedTimeline;
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
index e84921c080ae..e8ff9add0ad8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
@@ -33,6 +33,9 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.storage.HoodieStorage;
 
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
 import java.util.List;
 
 /**
@@ -40,16 +43,15 @@ import java.util.List;
  *
  * @param <T> the engine specific record type
  */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
 class DefaultFileGroupRecordBufferLoader<T> extends 
LogScanningRecordBufferLoader implements FileGroupRecordBufferLoader<T> {
+
   private static final DefaultFileGroupRecordBufferLoader INSTANCE = new 
DefaultFileGroupRecordBufferLoader<>();
 
   static <T> DefaultFileGroupRecordBufferLoader<T> getInstance() {
     return INSTANCE;
   }
 
-  private DefaultFileGroupRecordBufferLoader() {
-  }
-
   @Override
   public Pair<HoodieFileGroupRecordBuffer<T>, List<String>> 
getRecordBuffer(HoodieReaderContext<T> readerContext,
                                                                             
HoodieStorage storage,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
index 30f872421057..8bd1013f6643 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
@@ -50,6 +50,9 @@ import 
org.apache.hudi.internal.schema.action.InternalSchemaMerger;
 import org.apache.hudi.internal.schema.convert.InternalSchemaConverter;
 import org.apache.hudi.io.util.FileIOUtils;
 
+import lombok.Getter;
+import lombok.Setter;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Iterator;
@@ -77,8 +80,10 @@ abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordBuffer<T
   protected final Option<Pair<String, String>> payloadClasses;
   protected final TypedProperties props;
   protected final ExternalSpillableMap<Serializable, BufferedRecord<T>> 
records;
+  @Getter
   protected final DeleteContext deleteContext;
   protected final BufferedRecordConverter<T> bufferedRecordConverter;
+  @Setter
   protected ClosableIterator<T> baseFileIterator;
   protected UpdateProcessor<T> updateProcessor;
   protected Iterator<BufferedRecord<T>> logRecordIterator;
@@ -87,6 +92,7 @@ abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordBuffer<T
   protected InternalSchema internalSchema;
   protected HoodieTableMetaClient hoodieTableMetaClient;
   protected BufferedRecordMerger<T> bufferedRecordMerger;
+  @Getter
   protected long totalLogRecords = 0;
 
   protected FileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
@@ -131,15 +137,6 @@ abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordBuffer<T
         readerContext.getRecordSizeEstimator(), diskMapType, 
readerContext.getRecordSerializer(), isBitCaskDiskMapCompressionEnabled, 
getClass().getSimpleName());
   }
 
-  @Override
-  public void setBaseFileIterator(ClosableIterator<T> baseFileIterator) {
-    this.baseFileIterator = baseFileIterator;
-  }
-
-  public DeleteContext getDeleteContext() {
-    return deleteContext;
-  }
-
   /**
    * This allows hasNext() to be called multiple times without incrementing 
the iterator by more than 1
    * record. It does come with the caveat that hasNext() must be called every 
time before next(). But
@@ -169,10 +166,6 @@ abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordBuffer<T
     return records.size();
   }
 
-  public long getTotalLogRecords() {
-    return totalLogRecords;
-  }
-
   @Override
   public ClosableIterator<BufferedRecord<T>> getLogRecordIterator() {
     return new LogRecordIterator<>(this);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java
index 73dd5dda5e87..5eaa54e033c8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java
@@ -41,9 +41,8 @@ import 
org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieKeyException;
 
+import lombok.extern.slf4j.Slf4j;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -61,8 +60,8 @@ import java.util.function.Function;
  * Here the position means that record position in the base file. The records 
from the base file is accessed from an iterator object. These records are 
merged when the
  * {@link #hasNext} method is called.
  */
+@Slf4j
 public class PositionBasedFileGroupRecordBuffer<T> extends 
KeyBasedFileGroupRecordBuffer<T> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(PositionBasedFileGroupRecordBuffer.class);
 
   private static final String ROW_INDEX_COLUMN_NAME = "row_index";
   public static final String ROW_INDEX_TEMPORARY_COLUMN_NAME = 
"_tmp_metadata_" + ROW_INDEX_COLUMN_NAME;
@@ -96,7 +95,7 @@ public class PositionBasedFileGroupRecordBuffer<T> extends 
KeyBasedFileGroupReco
     // Extract positions from data block.
     List<Long> recordPositions = extractRecordPositions(dataBlock, 
baseFileInstantTime);
     if (recordPositions == null) {
-      LOG.debug("Falling back to key based merge for data block");
+      log.debug("Falling back to key based merge for data block");
       fallbackToKeyBasedBuffer();
       super.processDataBlock(dataBlock, keySpecOpt);
       return;
@@ -181,7 +180,7 @@ public class PositionBasedFileGroupRecordBuffer<T> extends 
KeyBasedFileGroupReco
 
     List<Long> recordPositions = extractRecordPositions(deleteBlock, 
baseFileInstantTime);
     if (recordPositions == null) {
-      LOG.debug("Falling back to key based merging for delete block");
+      log.debug("Falling back to key based merging for delete block");
       fallbackToKeyBasedBuffer();
       super.processDeleteBlock(deleteBlock);
       return;
@@ -291,7 +290,7 @@ public class PositionBasedFileGroupRecordBuffer<T> extends 
KeyBasedFileGroupReco
 
     String blockBaseFileInstantTime = 
logBlock.getBaseFileInstantTimeOfPositions();
     if (StringUtils.isNullOrEmpty(blockBaseFileInstantTime) || 
!baseFileInstantTime.equals(blockBaseFileInstantTime)) {
-      LOG.debug("The record positions cannot be used because the base file 
instant time "
+      log.debug("The record positions cannot be used because the base file 
instant time "
               + "is either missing or different from the base file to merge. "
               + "Instant time in the header: {}, base file instant time of the 
file group: {}.",
           blockBaseFileInstantTime, baseFileInstantTime);
@@ -299,7 +298,7 @@ public class PositionBasedFileGroupRecordBuffer<T> extends 
KeyBasedFileGroupReco
     }
     Roaring64NavigableMap positions = logBlock.getRecordPositions();
     if (positions == null || positions.isEmpty()) {
-      LOG.info("No record position info is found when attempting to do 
position based merge.");
+      log.info("No record position info is found when attempting to do 
position based merge.");
       return null;
     }
 
@@ -309,7 +308,7 @@ public class PositionBasedFileGroupRecordBuffer<T> extends 
KeyBasedFileGroupReco
     }
 
     if (blockPositions.isEmpty()) {
-      LOG.info("No positions are extracted.");
+      log.info("No positions are extracted.");
       return null;
     }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
index 7f6771ba3940..57b0a30c4cb1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplitSerializer.java
@@ -103,13 +103,13 @@ public class HoodieSourceSplitSerializer implements 
SimpleVersionedSerializer<Ho
           out.writeBoolean(false);
         }
 
-        out.writeBoolean(instantRange.getStartInstant().isPresent());
-        if (instantRange.getStartInstant().isPresent()) {
-          out.writeUTF(instantRange.getStartInstant().get());
+        out.writeBoolean(instantRange.getStartInstantOpt().isPresent());
+        if (instantRange.getStartInstantOpt().isPresent()) {
+          out.writeUTF(instantRange.getStartInstantOpt().get());
         }
-        out.writeBoolean(instantRange.getEndInstant().isPresent());
-        if (instantRange.getEndInstant().isPresent()) {
-          out.writeUTF(instantRange.getEndInstant().get());
+        out.writeBoolean(instantRange.getEndInstantOpt().isPresent());
+        if (instantRange.getEndInstantOpt().isPresent()) {
+          out.writeUTF(instantRange.getEndInstantOpt().get());
         }
 
         if 
(instantRange.getRangeType().equals(InstantRange.RangeType.EXACT_MATCH)) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
index f509013f086b..472c8d74c3c8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
@@ -438,11 +438,11 @@ public class TestIncrementalInputSplits extends 
HoodieCommonTestHarness {
     IncrementalInputSplits.Result result = iis.inputSplits(metaClient, 
firstInstant.getCompletionTime(), false);
 
     String minStartCommit = result.getInputSplits().stream()
-            .map(split -> 
split.getInstantRange().get().getStartInstant().get())
+            .map(split -> 
split.getInstantRange().get().getStartInstantOpt().get())
             .min((commit1,commit2) -> compareTimestamps(commit1, LESSER_THAN, 
commit2) ? 1 : 0)
             .orElse(null);
     String maxEndCommit = result.getInputSplits().stream()
-            .map(split -> split.getInstantRange().get().getEndInstant().get())
+            .map(split -> 
split.getInstantRange().get().getEndInstantOpt().get())
             .max((commit1,commit2) -> compareTimestamps(commit1, GREATER_THAN, 
commit2) ? 1 : 0)
             .orElse(null);
     assertEquals(0, intervalBetween2Instants(commitsTimeline, minStartCommit, 
maxEndCommit), "Should read 1 instant");
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
index ec6f3b863caf..bb28ee92e2df 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
@@ -505,8 +505,8 @@ public class TestStreamReadMonitoringFunction {
 
   private static boolean isPointInstantRange(InstantRange instantRange, String 
timestamp) {
     return instantRange != null
-        && Objects.equals(timestamp, instantRange.getStartInstant().get())
-        && Objects.equals(timestamp, instantRange.getEndInstant().get());
+        && Objects.equals(timestamp, instantRange.getStartInstantOpt().get())
+        && Objects.equals(timestamp, instantRange.getEndInstantOpt().get());
   }
 
   private AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> 
createHarness(
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java
index 1cb244245ce8..8dd32af3adf3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplit.java
@@ -360,8 +360,8 @@ public class TestHoodieSourceSplit {
     );
 
     assertTrue(split.getInstantRange().isPresent());
-    assertEquals("20230101000000000", 
split.getInstantRange().get().getStartInstant().get());
-    assertEquals("20230131235959999", 
split.getInstantRange().get().getEndInstant().get());
+    assertEquals("20230101000000000", 
split.getInstantRange().get().getStartInstantOpt().get());
+    assertEquals("20230131235959999", 
split.getInstantRange().get().getEndInstantOpt().get());
   }
 
   @Test
@@ -402,9 +402,9 @@ public class TestHoodieSourceSplit {
     );
 
     assertTrue(split.getInstantRange().isPresent());
-    assertTrue(split.getInstantRange().get().getStartInstant().isPresent());
-    assertFalse(split.getInstantRange().get().getEndInstant().isPresent());
-    assertEquals("20230101000000000", 
split.getInstantRange().get().getStartInstant().get());
+    assertTrue(split.getInstantRange().get().getStartInstantOpt().isPresent());
+    assertFalse(split.getInstantRange().get().getEndInstantOpt().isPresent());
+    assertEquals("20230101000000000", 
split.getInstantRange().get().getStartInstantOpt().get());
   }
 
   @Test
@@ -476,7 +476,7 @@ public class TestHoodieSourceSplit {
     );
 
     assertTrue(split.getInstantRange().isPresent());
-    assertTrue(split.getInstantRange().get().getStartInstant().isPresent());
-    assertTrue(split.getInstantRange().get().getEndInstant().isPresent());
+    assertTrue(split.getInstantRange().get().getStartInstantOpt().isPresent());
+    assertTrue(split.getInstantRange().get().getEndInstantOpt().isPresent());
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
index 523606bb5f76..d56d18648c29 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestHoodieSourceSplitSerializer.java
@@ -508,10 +508,10 @@ public class TestHoodieSourceSplitSerializer {
 
     assertNotNull(deserialized);
     assertTrue(deserialized.getInstantRange().isPresent());
-    
assertTrue(deserialized.getInstantRange().get().getStartInstant().isPresent());
-    
assertTrue(deserialized.getInstantRange().get().getEndInstant().isPresent());
-    assertEquals("20230101000000000", 
deserialized.getInstantRange().get().getStartInstant().get());
-    assertEquals("20230131235959999", 
deserialized.getInstantRange().get().getEndInstant().get());
+    
assertTrue(deserialized.getInstantRange().get().getStartInstantOpt().isPresent());
+    
assertTrue(deserialized.getInstantRange().get().getEndInstantOpt().isPresent());
+    assertEquals("20230101000000000", 
deserialized.getInstantRange().get().getStartInstantOpt().get());
+    assertEquals("20230131235959999", 
deserialized.getInstantRange().get().getEndInstantOpt().get());
   }
 
   @Test
@@ -539,9 +539,9 @@ public class TestHoodieSourceSplitSerializer {
 
     assertNotNull(deserialized);
     assertTrue(deserialized.getInstantRange().isPresent());
-    
assertTrue(deserialized.getInstantRange().get().getStartInstant().isPresent());
-    
assertFalse(deserialized.getInstantRange().get().getEndInstant().isPresent());
-    assertEquals("20230101000000000", 
deserialized.getInstantRange().get().getStartInstant().get());
+    
assertTrue(deserialized.getInstantRange().get().getStartInstantOpt().isPresent());
+    
assertFalse(deserialized.getInstantRange().get().getEndInstantOpt().isPresent());
+    assertEquals("20230101000000000", 
deserialized.getInstantRange().get().getStartInstantOpt().get());
   }
 
   @Test
@@ -569,10 +569,10 @@ public class TestHoodieSourceSplitSerializer {
 
     assertNotNull(deserialized);
     assertTrue(deserialized.getInstantRange().isPresent());
-    
assertTrue(deserialized.getInstantRange().get().getStartInstant().isPresent());
-    
assertTrue(deserialized.getInstantRange().get().getEndInstant().isPresent());
-    assertEquals("20230101000000000", 
deserialized.getInstantRange().get().getStartInstant().get());
-    assertEquals("20230131235959999", 
deserialized.getInstantRange().get().getEndInstant().get());
+    
assertTrue(deserialized.getInstantRange().get().getStartInstantOpt().isPresent());
+    
assertTrue(deserialized.getInstantRange().get().getEndInstantOpt().isPresent());
+    assertEquals("20230101000000000", 
deserialized.getInstantRange().get().getStartInstantOpt().get());
+    assertEquals("20230131235959999", 
deserialized.getInstantRange().get().getEndInstantOpt().get());
   }
 
   @Test
@@ -604,8 +604,8 @@ public class TestHoodieSourceSplitSerializer {
     assertTrue(deserialized.getInstantRange().isPresent());
     assertEquals(10, deserialized.getFileOffset());
     assertEquals(500L, deserialized.getConsumed());
-    assertEquals("20230101000000000", 
deserialized.getInstantRange().get().getStartInstant().get());
-    assertEquals("20230131235959999", 
deserialized.getInstantRange().get().getEndInstant().get());
+    assertEquals("20230101000000000", 
deserialized.getInstantRange().get().getStartInstantOpt().get());
+    assertEquals("20230131235959999", 
deserialized.getInstantRange().get().getEndInstantOpt().get());
   }
 
   @Test
@@ -636,13 +636,13 @@ public class TestHoodieSourceSplitSerializer {
 
     // Verify split1
     assertTrue(deserialized1.getInstantRange().isPresent());
-    assertEquals("20230101000000000", 
deserialized1.getInstantRange().get().getStartInstant().get());
-    assertEquals("20230131235959999", 
deserialized1.getInstantRange().get().getEndInstant().get());
+    assertEquals("20230101000000000", 
deserialized1.getInstantRange().get().getStartInstantOpt().get());
+    assertEquals("20230131235959999", 
deserialized1.getInstantRange().get().getEndInstantOpt().get());
 
     // Verify split2
     assertTrue(deserialized2.getInstantRange().isPresent());
-    assertEquals("20230201000000000", 
deserialized2.getInstantRange().get().getStartInstant().get());
-    
assertFalse(deserialized2.getInstantRange().get().getEndInstant().isPresent());
+    assertEquals("20230201000000000", 
deserialized2.getInstantRange().get().getStartInstantOpt().get());
+    
assertFalse(deserialized2.getInstantRange().get().getEndInstantOpt().isPresent());
 
     // Verify split3
     assertFalse(deserialized3.getInstantRange().isPresent());
@@ -862,15 +862,15 @@ public class TestHoodieSourceSplitSerializer {
     assertTrue(deserialized2.getInstantRange().isPresent());
     assertEquals(InstantRange.RangeType.OPEN_CLOSED,
         deserialized2.getInstantRange().get().getRangeType());
-    assertEquals("20230201000000000", 
deserialized2.getInstantRange().get().getStartInstant().get());
-    assertEquals("20230228235959999", 
deserialized2.getInstantRange().get().getEndInstant().get());
+    assertEquals("20230201000000000", 
deserialized2.getInstantRange().get().getStartInstantOpt().get());
+    assertEquals("20230228235959999", 
deserialized2.getInstantRange().get().getEndInstantOpt().get());
 
     // Verify split3 (CLOSED_CLOSED)
     assertTrue(deserialized3.getInstantRange().isPresent());
     assertEquals(InstantRange.RangeType.CLOSED_CLOSED,
         deserialized3.getInstantRange().get().getRangeType());
-    assertEquals("20230301000000000", 
deserialized3.getInstantRange().get().getStartInstant().get());
-    assertEquals("20230331235959999", 
deserialized3.getInstantRange().get().getEndInstant().get());
+    assertEquals("20230301000000000", 
deserialized3.getInstantRange().get().getStartInstantOpt().get());
+    assertEquals("20230331235959999", 
deserialized3.getInstantRange().get().getEndInstantOpt().get());
   }
 
   @Test
@@ -953,9 +953,9 @@ public class TestHoodieSourceSplitSerializer {
     assertTrue(deserialized.getInstantRange().isPresent());
     assertEquals(InstantRange.RangeType.CLOSED_CLOSED,
         deserialized.getInstantRange().get().getRangeType());
-    
assertTrue(deserialized.getInstantRange().get().getStartInstant().isPresent());
-    
assertFalse(deserialized.getInstantRange().get().getEndInstant().isPresent());
-    assertEquals("20230101000000000", 
deserialized.getInstantRange().get().getStartInstant().get());
+    
assertTrue(deserialized.getInstantRange().get().getStartInstantOpt().isPresent());
+    
assertFalse(deserialized.getInstantRange().get().getEndInstantOpt().isPresent());
+    assertEquals("20230101000000000", 
deserialized.getInstantRange().get().getStartInstantOpt().get());
 
     // Verify range behavior - start is inclusive, no end boundary
     
assertTrue(deserialized.getInstantRange().get().isInRange("20230101000000000"));
 // start inclusive
@@ -991,9 +991,9 @@ public class TestHoodieSourceSplitSerializer {
     assertTrue(deserialized.getInstantRange().isPresent());
     assertEquals(InstantRange.RangeType.CLOSED_CLOSED,
         deserialized.getInstantRange().get().getRangeType());
-    
assertFalse(deserialized.getInstantRange().get().getStartInstant().isPresent());
-    
assertTrue(deserialized.getInstantRange().get().getEndInstant().isPresent());
-    assertEquals("20230131235959999", 
deserialized.getInstantRange().get().getEndInstant().get());
+    
assertFalse(deserialized.getInstantRange().get().getStartInstantOpt().isPresent());
+    
assertTrue(deserialized.getInstantRange().get().getEndInstantOpt().isPresent());
+    assertEquals("20230131235959999", 
deserialized.getInstantRange().get().getEndInstantOpt().get());
 
     // Verify range behavior - no start boundary, end is inclusive
     
assertTrue(deserialized.getInstantRange().get().isInRange("19700101000000000"));
@@ -1028,9 +1028,9 @@ public class TestHoodieSourceSplitSerializer {
     assertTrue(deserialized.getInstantRange().isPresent());
     assertEquals(InstantRange.RangeType.OPEN_CLOSED,
         deserialized.getInstantRange().get().getRangeType());
-    
assertFalse(deserialized.getInstantRange().get().getStartInstant().isPresent());
-    
assertTrue(deserialized.getInstantRange().get().getEndInstant().isPresent());
-    assertEquals("20230131235959999", 
deserialized.getInstantRange().get().getEndInstant().get());
+    
assertFalse(deserialized.getInstantRange().get().getStartInstantOpt().isPresent());
+    
assertTrue(deserialized.getInstantRange().get().getEndInstantOpt().isPresent());
+    assertEquals("20230131235959999", 
deserialized.getInstantRange().get().getEndInstantOpt().get());
 
     // Verify range behavior - no start boundary, end is inclusive
     
assertTrue(deserialized.getInstantRange().get().isInRange("19700101000000000"));
@@ -1086,23 +1086,23 @@ public class TestHoodieSourceSplitSerializer {
     HoodieSourceSplit deserialized4 = 
serializer.deserialize(serializer.getVersion(), serialized4);
 
     // Verify OPEN_CLOSED with only start
-    
assertTrue(deserialized1.getInstantRange().get().getStartInstant().isPresent());
-    
assertFalse(deserialized1.getInstantRange().get().getEndInstant().isPresent());
+    
assertTrue(deserialized1.getInstantRange().get().getStartInstantOpt().isPresent());
+    
assertFalse(deserialized1.getInstantRange().get().getEndInstantOpt().isPresent());
     assertEquals(InstantRange.RangeType.OPEN_CLOSED, 
deserialized1.getInstantRange().get().getRangeType());
 
     // Verify OPEN_CLOSED with only end
-    
assertFalse(deserialized2.getInstantRange().get().getStartInstant().isPresent());
-    
assertTrue(deserialized2.getInstantRange().get().getEndInstant().isPresent());
+    
assertFalse(deserialized2.getInstantRange().get().getStartInstantOpt().isPresent());
+    
assertTrue(deserialized2.getInstantRange().get().getEndInstantOpt().isPresent());
     assertEquals(InstantRange.RangeType.OPEN_CLOSED, 
deserialized2.getInstantRange().get().getRangeType());
 
     // Verify CLOSED_CLOSED with only start
-    
assertTrue(deserialized3.getInstantRange().get().getStartInstant().isPresent());
-    
assertFalse(deserialized3.getInstantRange().get().getEndInstant().isPresent());
+    
assertTrue(deserialized3.getInstantRange().get().getStartInstantOpt().isPresent());
+    
assertFalse(deserialized3.getInstantRange().get().getEndInstantOpt().isPresent());
     assertEquals(InstantRange.RangeType.CLOSED_CLOSED, 
deserialized3.getInstantRange().get().getRangeType());
 
     // Verify CLOSED_CLOSED with only end
-    
assertFalse(deserialized4.getInstantRange().get().getStartInstant().isPresent());
-    
assertTrue(deserialized4.getInstantRange().get().getEndInstant().isPresent());
+    
assertFalse(deserialized4.getInstantRange().get().getStartInstantOpt().isPresent());
+    
assertTrue(deserialized4.getInstantRange().get().getEndInstantOpt().isPresent());
     assertEquals(InstantRange.RangeType.CLOSED_CLOSED, 
deserialized4.getInstantRange().get().getRangeType());
   }
 

Reply via email to