This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 55309f2f79 Remove split commit and some deprecated config for 
real-time protocol on controller (#11663)
55309f2f79 is described below

commit 55309f2f79ad33b82ad1648bdb113ee81c60ac46
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Sat Sep 23 18:15:46 2023 -0700

    Remove split commit and some deprecated config for real-time protocol on 
controller (#11663)
---
 .../protocols/SegmentCompletionProtocol.java       | 54 ++--------------------
 .../apache/pinot/controller/ControllerConf.java    | 15 ------
 .../resources/LLCSegmentCompletionHandlers.java    |  4 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  6 +--
 .../core/realtime/SegmentCompletionManager.java    | 13 +-----
 .../api/SegmentCompletionProtocolDeserTest.java    | 44 +++++-------------
 .../PinotLLCRealtimeSegmentManagerTest.java        |  2 -
 .../realtime/LLRealtimeSegmentDataManager.java     | 10 +---
 .../realtime/LLRealtimeSegmentDataManagerTest.java | 43 +----------------
 ...tlyOnceKafkaRealtimeClusterIntegrationTest.java | 17 -------
 ...ssageDecoderRealtimeClusterIntegrationTest.java |  8 +---
 .../tests/LLCRealtimeClusterIntegrationTest.java   |  9 +---
 ...rDownloadLLCRealtimeClusterIntegrationTest.java |  4 --
 13 files changed, 28 insertions(+), 201 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index 8293bcf4d3..3b0753919d 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -108,7 +108,6 @@ public class SegmentCompletionProtocol {
   }
 
   public static final String STATUS_KEY = "status";
-  public static final String OFFSET_KEY = "offset";
   // Sent by controller in COMMIT message
   public static final String BUILD_TIME_KEY = "buildTimeSec";
   public static final String COMMIT_TYPE_KEY = "isSplitCommitType";
@@ -443,9 +442,7 @@ public class SegmentCompletionProtocol {
   @JsonIgnoreProperties(ignoreUnknown = true)
   public static class Response {
     private ControllerResponseStatus _status;
-    private long _offset = -1;
     private long _buildTimeSeconds = -1;
-    private boolean _splitCommit;
     private String _segmentLocation;
     private String _controllerVipUrl;
     private String _streamPartitionMsgOffset;
@@ -455,9 +452,7 @@ public class SegmentCompletionProtocol {
 
     public Response(Params params) {
       _status = params.getStatus();
-      _offset = params.getOffset();
       _buildTimeSeconds = params.getBuildTimeSeconds();
-      _splitCommit = params.isSplitCommit();
       _segmentLocation = params.getSegmentLocation();
       _controllerVipUrl = params.getControllerVipUrl();
       _streamPartitionMsgOffset = params.getStreamPartitionMsgOffset();
@@ -473,12 +468,6 @@ public class SegmentCompletionProtocol {
       _status = status;
     }
 
-    @Deprecated
-    @JsonProperty(OFFSET_KEY)
-    public long getOffset() {
-      return _offset;
-    }
-
     // This method is called in the server when the controller responds with
     // CATCH_UP response to segmentConsumed() API.
     @JsonProperty(STREAM_PARTITION_MSG_OFFSET_KEY)
@@ -490,12 +479,6 @@ public class SegmentCompletionProtocol {
       _streamPartitionMsgOffset = streamPartitionMsgOffset;
     }
 
-    @Deprecated
-    @JsonProperty(OFFSET_KEY)
-    public void setOffset(long offset) {
-      _offset = offset;
-    }
-
     @JsonProperty(BUILD_TIME_KEY)
     public long getBuildTimeSeconds() {
       return _buildTimeSeconds;
@@ -506,14 +489,11 @@ public class SegmentCompletionProtocol {
       _buildTimeSeconds = buildTimeSeconds;
     }
 
+    // Always return true for backward compatibility. Deprecated in 1.0
+    @Deprecated
     @JsonProperty(COMMIT_TYPE_KEY)
     public boolean isSplitCommit() {
-      return _splitCommit;
-    }
-
-    @JsonProperty(COMMIT_TYPE_KEY)
-    public void setSplitCommit(boolean splitCommit) {
-      _splitCommit = splitCommit;
+      return true;
     }
 
     @JsonProperty(CONTROLLER_VIP_URL_KEY)
@@ -556,18 +536,14 @@ public class SegmentCompletionProtocol {
 
     public static class Params {
       private ControllerResponseStatus _status;
-      private long _offset;
       private long _buildTimeSeconds;
-      private boolean _splitCommit;
       private String _segmentLocation;
       private String _controllerVipUrl;
       private String _streamPartitionMsgOffset;
 
       public Params() {
-        _offset = -1L;
         _status = ControllerResponseStatus.FAILED;
         _buildTimeSeconds = -1;
-        _splitCommit = false;
         _segmentLocation = null;
         _controllerVipUrl = null;
         _streamPartitionMsgOffset = null;
@@ -583,11 +559,6 @@ public class SegmentCompletionProtocol {
         return this;
       }
 
-      public Params withSplitCommit(boolean splitCommit) {
-        _splitCommit = splitCommit;
-        return this;
-      }
-
       public Params withControllerVipUrl(String controllerVipUrl) {
         _controllerVipUrl = controllerVipUrl;
         return this;
@@ -598,14 +569,8 @@ public class SegmentCompletionProtocol {
         return this;
       }
 
-      public Params withStreamPartitionMsgOffset(String offset) {
-        _streamPartitionMsgOffset = offset;
-        // TODO Issue 5359 Remove the block below once we have both parties be 
fine without _offset being present.
-        try {
-          _offset = Long.parseLong(_streamPartitionMsgOffset);
-        } catch (Exception e) {
-          // Ignore. If the receiver expects _offset, it will return an error 
to the sender.
-        }
+      public Params withStreamPartitionMsgOffset(String 
streamPartitionMsgOffset) {
+        _streamPartitionMsgOffset = streamPartitionMsgOffset;
         return this;
       }
 
@@ -613,19 +578,10 @@ public class SegmentCompletionProtocol {
         return _status;
       }
 
-      @Deprecated
-      private long getOffset() {
-        return _offset;
-      }
-
       public long getBuildTimeSeconds() {
         return _buildTimeSeconds;
       }
 
-      public boolean isSplitCommit() {
-        return _splitCommit;
-      }
-
       public String getSegmentLocation() {
         return _segmentLocation;
       }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 22a838faf1..fa6853191f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -264,7 +264,6 @@ public class ControllerConf extends PinotConfiguration {
   private static final String SEGMENT_COMMIT_TIMEOUT_SECONDS = 
"controller.realtime.segment.commit.timeoutSeconds";
   private static final String DELETED_SEGMENTS_RETENTION_IN_DAYS = 
"controller.deleted.segments.retentionInDays";
   public static final String TABLE_MIN_REPLICAS = "table.minReplicas";
-  public static final String ENABLE_SPLIT_COMMIT = 
"controller.enable.split.commit";
   private static final String JERSEY_ADMIN_API_PORT = "jersey.admin.api.port";
   private static final String JERSEY_ADMIN_IS_PRIMARY = 
"jersey.admin.isprimary";
   public static final String ACCESS_CONTROL_FACTORY_CLASS = 
"controller.admin.access.control.factory.class";
@@ -278,11 +277,6 @@ public class ControllerConf extends PinotConfiguration {
       "controller.realtime.segment.metadata.commit.numLocks";
   private static final String ENABLE_STORAGE_QUOTA_CHECK = 
"controller.enable.storage.quota.check";
   private static final String ENABLE_BATCH_MESSAGE_MODE = 
"controller.enable.batch.message.mode";
-  // It is used to disable the HLC realtime segment completion and disallow 
HLC table in the cluster. True by default.
-  // If it's set to false, existing HLC realtime tables will stop consumption, 
and creation of new HLC tables will be
-  // disallowed.
-  // Please make sure there is no HLC table running in the cluster before 
disallowing it.
-  public static final String ALLOW_HLC_TABLES = "controller.allow.hlc.tables";
   public static final String DIM_TABLE_MAX_SIZE = 
"controller.dimTable.maxSize";
 
   // Defines the kind of storage and the underlying PinotFS implementation
@@ -292,7 +286,6 @@ public class ControllerConf extends PinotConfiguration {
   private static final int DEFAULT_MINION_ADMIN_REQUEST_TIMEOUT_SECONDS = 30;
   private static final int DEFAULT_DELETED_SEGMENTS_RETENTION_IN_DAYS = 7;
   private static final int DEFAULT_TABLE_MIN_REPLICAS = 1;
-  private static final boolean DEFAULT_ENABLE_SPLIT_COMMIT = true;
   private static final int DEFAULT_JERSEY_ADMIN_PORT = 21000;
   private static final String DEFAULT_ACCESS_CONTROL_FACTORY_CLASS =
       "org.apache.pinot.controller.api.access.AllowAllAccessFactory";
@@ -346,10 +339,6 @@ public class ControllerConf extends PinotConfiguration {
     }
   }
 
-  public void setSplitCommit(boolean isSplitCommit) {
-    setProperty(ENABLE_SPLIT_COMMIT, isSplitCommit);
-  }
-
   public void setQueryConsolePath(String path) {
     setProperty(CONSOLE_WEBAPP_ROOT_PATH, path);
   }
@@ -490,10 +479,6 @@ public class ControllerConf extends PinotConfiguration {
     return super.toString();
   }
 
-  public boolean getAcceptSplitCommit() {
-    return getProperty(ENABLE_SPLIT_COMMIT, DEFAULT_ENABLE_SPLIT_COMMIT);
-  }
-
   public String getControllerVipHost() {
     return Optional.ofNullable(getProperty(CONTROLLER_VIP_HOST))
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index 836f58052d..93fd7b1896 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -343,8 +343,8 @@ public class LLCSegmentCompletionHandlers {
     }
 
     response = segmentCompletionManager.segmentCommitEnd(requestParams, 
success, false, committingSegmentDescriptor);
-    LOGGER.info("Response to segmentCommit: instance={}  segment={} status={} 
offset={}, streamMsgOffset={}",
-        requestParams.getInstanceId(), requestParams.getSegmentName(), 
response.getStatus(), response.getOffset(),
+    LOGGER.info("Response to segmentCommit: instance={}, segment={}, 
status={}, streamMsgOffset={}",
+        requestParams.getInstanceId(), requestParams.getSegmentName(), 
response.getStatus(),
         response.getStreamPartitionMsgOffset());
 
     return response.toJsonString();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index a1f49884c0..8611c251aa 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -209,10 +209,6 @@ public class PinotLLCRealtimeSegmentManager {
     return new FileUploadDownloadClient();
   }
 
-  public boolean getIsSplitCommitEnabled() {
-    return _controllerConf.getAcceptSplitCommit();
-  }
-
   /**
    * Using the ideal state and segment metadata, return a list of {@link 
PartitionGroupConsumptionStatus}
    * for latest segment of each partition group.
@@ -1471,7 +1467,7 @@ public class PinotLLCRealtimeSegmentManager {
       return 0L;
     }
 
-    if (!getIsSplitCommitEnabled() || !isTmpSegmentAsyncDeletionEnabled()) {
+    if (!isTmpSegmentAsyncDeletionEnabled()) {
       return 0L;
     }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 11134f342f..baab4b6ab7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -106,10 +106,6 @@ public class SegmentCompletionManager {
     }
   }
 
-  public boolean isSplitCommitEnabled() {
-    return _segmentManager.getIsSplitCommitEnabled();
-  }
-
   public String getControllerVipUrl() {
     return _segmentManager.getControllerVipUrl();
   }
@@ -391,7 +387,6 @@ public class SegmentCompletionManager {
     // We may need to add some time here to allow for getting the lock? For 
now 0
     // We may need to add some time for the committer come back to us (after 
the build)? For now 0.
     private long _maxTimeAllowedToCommitMs;
-    private final boolean _isSplitCommitEnabled;
     private final String _controllerVipUrl;
 
     public static SegmentCompletionFSM 
fsmInHolding(PinotLLCRealtimeSegmentManager segmentManager,
@@ -445,7 +440,6 @@ public class SegmentCompletionManager {
       }
       _initialCommitTimeMs = initialCommitTimeMs;
       _maxTimeAllowedToCommitMs = _startTimeMs + _initialCommitTimeMs;
-      _isSplitCommitEnabled = segmentCompletionManager.isSplitCommitEnabled();
       _controllerVipUrl = segmentCompletionManager.getControllerVipUrl();
     }
 
@@ -463,7 +457,7 @@ public class SegmentCompletionManager {
     @Override
     public String toString() {
       return "{" + _segmentName.getSegmentName() + "," + _state + "," + 
_startTimeMs + "," + _winner + ","
-          + _winningOffset + "," + _isSplitCommitEnabled + "," + 
_controllerVipUrl + "}";
+          + _winningOffset + "," + _controllerVipUrl + "}";
     }
 
     // SegmentCompletionManager releases the FSM from the hashtable when it is 
done.
@@ -680,10 +674,7 @@ public class SegmentCompletionManager {
           new 
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(offset.toString())
               .withBuildTimeSeconds(allowedBuildTimeSec)
               
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT)
-              .withSplitCommit(_isSplitCommitEnabled);
-      if (_isSplitCommitEnabled) {
-        params.withControllerVipUrl(_controllerVipUrl);
-      }
+              .withControllerVipUrl(_controllerVipUrl);
       return new SegmentCompletionProtocol.Response(params);
     }
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionProtocolDeserTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionProtocolDeserTest.java
index 3420d069f8..b7ebe18058 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionProtocolDeserTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionProtocolDeserTest.java
@@ -26,7 +26,6 @@ import org.apache.pinot.spi.utils.JsonUtils;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
@@ -42,14 +41,13 @@ public class SegmentCompletionProtocolDeserTest {
     // Test with all params
     SegmentCompletionProtocol.Response.Params params =
         new 
SegmentCompletionProtocol.Response.Params().withBuildTimeSeconds(BUILD_TIME_MILLIS)
-            
.withStreamPartitionMsgOffset(OFFSET.toString()).withSegmentLocation(SEGMENT_LOCATION).withSplitCommit(true)
+            
.withStreamPartitionMsgOffset(OFFSET.toString()).withSegmentLocation(SEGMENT_LOCATION)
             
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
 
     SegmentCompletionProtocol.Response response = new 
SegmentCompletionProtocol.Response(params);
     assertEquals(response.getBuildTimeSeconds(), BUILD_TIME_MILLIS);
     assertEquals(new 
LongMsgOffset(response.getStreamPartitionMsgOffset()).compareTo(OFFSET), 0);
     assertEquals(response.getSegmentLocation(), SEGMENT_LOCATION);
-    assertTrue(response.isSplitCommit());
     assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
   }
 
@@ -65,7 +63,6 @@ public class SegmentCompletionProtocolDeserTest {
     assertEquals(response.getBuildTimeSeconds(), BUILD_TIME_MILLIS);
     assertEquals(new 
LongMsgOffset(response.getStreamPartitionMsgOffset()).compareTo(OFFSET), 0);
     assertNull(response.getSegmentLocation());
-    assertFalse(response.isSplitCommit());
     assertEquals(response.getStatus(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
   }
 
@@ -74,14 +71,14 @@ public class SegmentCompletionProtocolDeserTest {
     // Test with all params
     SegmentCompletionProtocol.Response.Params params =
         new 
SegmentCompletionProtocol.Response.Params().withBuildTimeSeconds(BUILD_TIME_MILLIS)
-            
.withStreamPartitionMsgOffset(OFFSET.toString()).withSegmentLocation(SEGMENT_LOCATION).withSplitCommit(true)
+            
.withStreamPartitionMsgOffset(OFFSET.toString()).withSegmentLocation(SEGMENT_LOCATION)
             .withControllerVipUrl(CONTROLLER_VIP_URL)
             
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
 
     SegmentCompletionProtocol.Response response = new 
SegmentCompletionProtocol.Response(params);
     JsonNode jsonNode = JsonUtils.objectToJsonNode(response);
 
-    assertEquals(jsonNode.get("offset").asText(), OFFSET.toString());
+    assertEquals(jsonNode.get("streamPartitionMsgOffset").asText(), 
OFFSET.toString());
     assertEquals(jsonNode.get("segmentLocation").asText(), SEGMENT_LOCATION);
     assertTrue(jsonNode.get("isSplitCommitType").asBoolean());
     assertEquals(jsonNode.get("status").asText(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT.toString());
@@ -92,32 +89,15 @@ public class SegmentCompletionProtocolDeserTest {
   public void testJsonNullSegmentLocationAndVip() {
     SegmentCompletionProtocol.Response.Params params =
         new 
SegmentCompletionProtocol.Response.Params().withBuildTimeSeconds(BUILD_TIME_MILLIS)
-            
.withStreamPartitionMsgOffset(OFFSET.toString()).withSplitCommit(false)
-            
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
-
-    SegmentCompletionProtocol.Response response = new 
SegmentCompletionProtocol.Response(params);
-    JsonNode jsonNode = JsonUtils.objectToJsonNode(response);
-
-    assertEquals(jsonNode.get("offset").asText(), OFFSET.toString());
-    assertNull(jsonNode.get("segmentLocation"));
-    assertFalse(jsonNode.get("isSplitCommitType").asBoolean());
-    assertEquals(jsonNode.get("status").asText(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT.toString());
-    assertNull(jsonNode.get("controllerVipUrl"));
-  }
-
-  @Test
-  public void testJsonResponseWithoutSplitCommit() {
-    SegmentCompletionProtocol.Response.Params params =
-        new 
SegmentCompletionProtocol.Response.Params().withBuildTimeSeconds(BUILD_TIME_MILLIS)
-            
.withStreamPartitionMsgOffset(OFFSET.toString()).withSplitCommit(false)
+            .withStreamPartitionMsgOffset(OFFSET.toString())
             
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
 
     SegmentCompletionProtocol.Response response = new 
SegmentCompletionProtocol.Response(params);
     JsonNode jsonNode = JsonUtils.objectToJsonNode(response);
 
-    assertEquals(jsonNode.get("offset").asText(), OFFSET.toString());
+    assertEquals(jsonNode.get("streamPartitionMsgOffset").asText(), 
OFFSET.toString());
     assertNull(jsonNode.get("segmentLocation"));
-    assertFalse(jsonNode.get("isSplitCommitType").asBoolean());
+    assertTrue(jsonNode.get("isSplitCommitType").asBoolean());
     assertEquals(jsonNode.get("status").asText(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT.toString());
     assertNull(jsonNode.get("controllerVipUrl"));
   }
@@ -129,14 +109,14 @@ public class SegmentCompletionProtocolDeserTest {
     SegmentCompletionProtocol.Response.Params params =
         new 
SegmentCompletionProtocol.Response.Params().withBuildTimeSeconds(BUILD_TIME_MILLIS)
             
.withStreamPartitionMsgOffset(OFFSET.toString()).withSegmentLocation(SEGMENT_LOCATION)
-            
.withSplitCommit(false).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
+            
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
 
     SegmentCompletionProtocol.Response response = new 
SegmentCompletionProtocol.Response(params);
     JsonNode jsonNode = JsonUtils.objectToJsonNode(response);
 
-    assertEquals(jsonNode.get("offset").asText(), OFFSET.toString());
+    assertEquals(jsonNode.get("streamPartitionMsgOffset").asText(), 
OFFSET.toString());
     assertEquals(jsonNode.get("segmentLocation").asText(), SEGMENT_LOCATION);
-    assertFalse(jsonNode.get("isSplitCommitType").asBoolean());
+    assertTrue(jsonNode.get("isSplitCommitType").asBoolean());
     assertEquals(jsonNode.get("status").asText(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT.toString());
     assertNull(jsonNode.get("controllerVipUrl"));
   }
@@ -148,14 +128,14 @@ public class SegmentCompletionProtocolDeserTest {
     SegmentCompletionProtocol.Response.Params params =
         new 
SegmentCompletionProtocol.Response.Params().withBuildTimeSeconds(BUILD_TIME_MILLIS)
             
.withStreamPartitionMsgOffset(OFFSET.toString()).withControllerVipUrl(CONTROLLER_VIP_URL)
-            
.withSplitCommit(false).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
+            
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
 
     SegmentCompletionProtocol.Response response = new 
SegmentCompletionProtocol.Response(params);
     JsonNode jsonNode = JsonUtils.objectToJsonNode(response);
 
-    assertEquals(jsonNode.get("offset").asText(), OFFSET.toString());
+    assertEquals(jsonNode.get("streamPartitionMsgOffset").asText(), 
OFFSET.toString());
     assertNull(jsonNode.get("segmentLocation"));
-    assertFalse(jsonNode.get("isSplitCommitType").asBoolean());
+    assertTrue(jsonNode.get("isSplitCommitType").asBoolean());
     assertEquals(jsonNode.get("status").asText(), 
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT.toString());
     assertEquals(jsonNode.get("controllerVipUrl").asText(), 
CONTROLLER_VIP_URL);
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 9a53ad839e..ffc98692a0 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -88,7 +88,6 @@ import org.testng.annotations.Test;
 
 import static 
org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.ENABLE_TMP_SEGMENT_ASYNC_DELETION;
 import static 
org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.TMP_SEGMENT_RETENTION_IN_SECONDS;
-import static org.apache.pinot.controller.ControllerConf.ENABLE_SPLIT_COMMIT;
 import static 
org.apache.pinot.spi.utils.CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
@@ -1060,7 +1059,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
     // turn on knobs for async deletion of tmp files
     ControllerConf config = new ControllerConf();
     config.setDataDir(TEMP_DIR.toString());
-    config.setProperty(ENABLE_SPLIT_COMMIT, true);
     config.setProperty(TMP_SEGMENT_RETENTION_IN_SECONDS, Integer.MIN_VALUE);
     config.setProperty(ENABLE_TMP_SEGMENT_ASYNC_DELETION, true);
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index ffa16052ee..42e79e67dc 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -757,8 +757,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
                 _state = State.ERROR;
                 _segmentLogger.error("Could not build segment for {}", 
_segmentNameStr);
               } else {
-                success = commitSegment(response.getControllerVipUrl(),
-                    response.isSplitCommit() && 
_indexLoadingConfig.isEnableSplitCommit());
+                success = commitSegment(response.getControllerVipUrl(), 
_indexLoadingConfig.isEnableSplitCommit());
                 if (success) {
                   _state = State.COMMITTED;
                 } else {
@@ -808,12 +807,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
 
   @VisibleForTesting
   protected StreamPartitionMsgOffset 
extractOffset(SegmentCompletionProtocol.Response response) {
-    if (response.getStreamPartitionMsgOffset() != null) {
-      return 
_streamPartitionMsgOffsetFactory.create(response.getStreamPartitionMsgOffset());
-    } else {
-      // TODO Issue 5359 Remove this once the protocol is upgraded on server 
and controller
-      return 
_streamPartitionMsgOffsetFactory.create(Long.toString(response.getOffset()));
-    }
+    return 
_streamPartitionMsgOffsetFactory.create(response.getStreamPartitionMsgOffset());
   }
 
   // Side effect: Modifies _segmentBuildDescriptor if we do not have a valid 
built segment file and we
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 18b2d707b1..333e1a8604 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -164,42 +164,6 @@ public class LLRealtimeSegmentDataManagerTest {
     SegmentBuildTimeLeaseExtender.shutdownExecutor();
   }
 
-  @Test
-  public void testOffsetParsing()
-      throws Exception {
-    final String offset = "34";
-    FakeLLRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager();
-    {
-      //  Controller sends catchup response with both offset and 
streamPartitionMsgOffset
-      String responseStr =
-          "{" + "  \"streamPartitionMsgOffset\" : \"" + offset + "\"," + "  
\"offset\" : " + offset + ","
-              + "  \"buildTimeSec\" : -1," + "  \"isSplitCommitType\" : false,"
-              + "  \"segmentLocation\" : \"file:///a/b\"," + "  \"status\" : 
\"CATCH_UP\"" + "}";
-      SegmentCompletionProtocol.Response response = 
SegmentCompletionProtocol.Response.fromJsonString(responseStr);
-      StreamPartitionMsgOffset extractedOffset = 
segmentDataManager.extractOffset(response);
-      Assert.assertEquals(extractedOffset.compareTo(new 
LongMsgOffset(offset)), 0);
-    }
-    {
-      //  Controller sends catchup response with offset only
-      String responseStr =
-          "{" + "  \"offset\" : " + offset + "," + "  \"buildTimeSec\" : -1," 
+ "  \"isSplitCommitType\" : false,"
-              + "  \"segmentLocation\" : \"file:///a/b\"," + "  \"status\" : 
\"CATCH_UP\"" + "}";
-      SegmentCompletionProtocol.Response response = 
SegmentCompletionProtocol.Response.fromJsonString(responseStr);
-      StreamPartitionMsgOffset extractedOffset = 
segmentDataManager.extractOffset(response);
-      Assert.assertEquals(extractedOffset.compareTo(new 
LongMsgOffset(offset)), 0);
-    }
-    {
-      //  Controller sends catchup response streamPartitionMsgOffset only
-      String responseStr = "{" + "  \"streamPartitionMsgOffset\" : \"" + 
offset + "\"," + "  \"buildTimeSec\" : -1,"
-          + "  \"isSplitCommitType\" : false," + "  \"segmentLocation\" : 
\"file:///a/b\","
-          + "  \"status\" : \"CATCH_UP\"" + "}";
-      SegmentCompletionProtocol.Response response = 
SegmentCompletionProtocol.Response.fromJsonString(responseStr);
-      StreamPartitionMsgOffset extractedOffset = 
segmentDataManager.extractOffset(response);
-      Assert.assertEquals(extractedOffset.compareTo(new 
LongMsgOffset(offset)), 0);
-    }
-    segmentDataManager.destroy();
-  }
-
   // Test that we are in HOLDING state as long as the controller responds HOLD 
to our segmentConsumed() message.
   // we should not consume when holding.
   @Test
@@ -1026,8 +990,7 @@ public class LLRealtimeSegmentDataManagerTest {
     }
 
     public PartitionConsumer createPartitionConsumer() {
-      PartitionConsumer consumer = new PartitionConsumer();
-      return consumer;
+      return new PartitionConsumer();
     }
 
     public SegmentBuildDescriptor invokeBuildForCommit(long leaseTime) {
@@ -1036,9 +999,7 @@ public class LLRealtimeSegmentDataManagerTest {
     }
 
     public boolean invokeCommit() {
-      SegmentCompletionProtocol.Response response = 
mock(SegmentCompletionProtocol.Response.class);
-      when(response.isSplitCommit()).thenReturn(false);
-      return super.commitSegment(response.getControllerVipUrl(), false);
+      return super.commitSegment("dummyUrl", false);
     }
 
     private void terminateLoopIfNecessary() {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
index c899c07998..21711d08ea 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
@@ -20,9 +20,6 @@ package org.apache.pinot.integration.tests;
 
 import java.io.File;
 import java.util.List;
-import java.util.Map;
-import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.spi.utils.ReadMode;
 
 
 public class ExactlyOnceKafkaRealtimeClusterIntegrationTest extends 
BaseRealtimeClusterIntegrationTest {
@@ -32,20 +29,6 @@ public class ExactlyOnceKafkaRealtimeClusterIntegrationTest 
extends BaseRealtime
     return true;
   }
 
-  @Override
-  protected String getLoadMode() {
-    return ReadMode.mmap.name();
-  }
-
-  @Override
-  public void startController()
-      throws Exception {
-    Map<String, Object> properties = getDefaultControllerConfiguration();
-
-    properties.put(ControllerConf.ALLOW_HLC_TABLES, false);
-    startController(properties);
-  }
-
   @Override
   protected void pushAvroIntoKafka(List<File> avroFiles)
       throws Exception {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java
index fd48bd6eb4..5553ce2a5b 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java
@@ -41,7 +41,6 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
-import org.apache.pinot.controller.ControllerConf;
 import 
org.apache.pinot.integration.tests.kafka.schemaregistry.SchemaRegistryStarter;
 import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
 import 
org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder;
@@ -193,12 +192,7 @@ public class 
KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegr
   @Override
   public void startController()
       throws Exception {
-    Map<String, Object> properties = getDefaultControllerConfiguration();
-
-    properties.put(ControllerConf.ALLOW_HLC_TABLES, false);
-    properties.put(ControllerConf.ENABLE_SPLIT_COMMIT, _enableSplitCommit);
-
-    startController(properties);
+    super.startController();
     
enableResourceConfigForLeadControllerResource(_enableLeadControllerResource);
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 88fe46e4e1..f71fc3a2c8 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -39,7 +39,6 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.HashUtil;
-import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -68,7 +67,6 @@ import static org.testng.Assert.assertTrue;
  */
 public class LLCRealtimeClusterIntegrationTest extends 
BaseRealtimeClusterIntegrationTest {
   private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test";
-  private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS = 
Collections.singletonList("DivActualElapsedTime");
   private static final long RANDOM_SEED = System.currentTimeMillis();
   private static final Random RANDOM = new Random(RANDOM_SEED);
 
@@ -91,12 +89,7 @@ public class LLCRealtimeClusterIntegrationTest extends 
BaseRealtimeClusterIntegr
   @Override
   public void startController()
       throws Exception {
-    Map<String, Object> properties = getDefaultControllerConfiguration();
-
-    properties.put(ControllerConf.ALLOW_HLC_TABLES, false);
-    properties.put(ControllerConf.ENABLE_SPLIT_COMMIT, _enableSplitCommit);
-
-    startController(properties);
+    super.startController();
     
enableResourceConfigForLeadControllerResource(_enableLeadControllerResource);
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java
index 964a854f98..f264bb367c 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java
@@ -53,8 +53,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.apache.pinot.controller.ControllerConf.ALLOW_HLC_TABLES;
-import static org.apache.pinot.controller.ControllerConf.ENABLE_SPLIT_COMMIT;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
@@ -139,8 +137,6 @@ public class PeerDownloadLLCRealtimeClusterIntegrationTest 
extends BaseRealtimeC
   public void startController()
       throws Exception {
     Map<String, Object> controllerConfig = getDefaultControllerConfiguration();
-    controllerConfig.put(ALLOW_HLC_TABLES, false);
-    controllerConfig.put(ENABLE_SPLIT_COMMIT, _enableSplitCommit);
     // Override the data dir config.
     controllerConfig.put(ControllerConf.DATA_DIR, "mockfs://" + 
getHelixClusterName());
     controllerConfig.put(ControllerConf.LOCAL_TEMP_DIR, 
FileUtils.getTempDirectory().getAbsolutePath());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to