This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 55309f2f79 Remove split commit and some deprecated config for
real-time protocol on controller (#11663)
55309f2f79 is described below
commit 55309f2f79ad33b82ad1648bdb113ee81c60ac46
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Sat Sep 23 18:15:46 2023 -0700
Remove split commit and some deprecated config for real-time protocol on
controller (#11663)
---
.../protocols/SegmentCompletionProtocol.java | 54 ++--------------------
.../apache/pinot/controller/ControllerConf.java | 15 ------
.../resources/LLCSegmentCompletionHandlers.java | 4 +-
.../realtime/PinotLLCRealtimeSegmentManager.java | 6 +--
.../core/realtime/SegmentCompletionManager.java | 13 +-----
.../api/SegmentCompletionProtocolDeserTest.java | 44 +++++-------------
.../PinotLLCRealtimeSegmentManagerTest.java | 2 -
.../realtime/LLRealtimeSegmentDataManager.java | 10 +---
.../realtime/LLRealtimeSegmentDataManagerTest.java | 43 +----------------
...tlyOnceKafkaRealtimeClusterIntegrationTest.java | 17 -------
...ssageDecoderRealtimeClusterIntegrationTest.java | 8 +---
.../tests/LLCRealtimeClusterIntegrationTest.java | 9 +---
...rDownloadLLCRealtimeClusterIntegrationTest.java | 4 --
13 files changed, 28 insertions(+), 201 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index 8293bcf4d3..3b0753919d 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -108,7 +108,6 @@ public class SegmentCompletionProtocol {
}
public static final String STATUS_KEY = "status";
- public static final String OFFSET_KEY = "offset";
// Sent by controller in COMMIT message
public static final String BUILD_TIME_KEY = "buildTimeSec";
public static final String COMMIT_TYPE_KEY = "isSplitCommitType";
@@ -443,9 +442,7 @@ public class SegmentCompletionProtocol {
@JsonIgnoreProperties(ignoreUnknown = true)
public static class Response {
private ControllerResponseStatus _status;
- private long _offset = -1;
private long _buildTimeSeconds = -1;
- private boolean _splitCommit;
private String _segmentLocation;
private String _controllerVipUrl;
private String _streamPartitionMsgOffset;
@@ -455,9 +452,7 @@ public class SegmentCompletionProtocol {
public Response(Params params) {
_status = params.getStatus();
- _offset = params.getOffset();
_buildTimeSeconds = params.getBuildTimeSeconds();
- _splitCommit = params.isSplitCommit();
_segmentLocation = params.getSegmentLocation();
_controllerVipUrl = params.getControllerVipUrl();
_streamPartitionMsgOffset = params.getStreamPartitionMsgOffset();
@@ -473,12 +468,6 @@ public class SegmentCompletionProtocol {
_status = status;
}
- @Deprecated
- @JsonProperty(OFFSET_KEY)
- public long getOffset() {
- return _offset;
- }
-
// This method is called in the server when the controller responds with
// CATCH_UP response to segmentConsumed() API.
@JsonProperty(STREAM_PARTITION_MSG_OFFSET_KEY)
@@ -490,12 +479,6 @@ public class SegmentCompletionProtocol {
_streamPartitionMsgOffset = streamPartitionMsgOffset;
}
- @Deprecated
- @JsonProperty(OFFSET_KEY)
- public void setOffset(long offset) {
- _offset = offset;
- }
-
@JsonProperty(BUILD_TIME_KEY)
public long getBuildTimeSeconds() {
return _buildTimeSeconds;
@@ -506,14 +489,11 @@ public class SegmentCompletionProtocol {
_buildTimeSeconds = buildTimeSeconds;
}
+ // Always return true for backward compatibility. Deprecated in 1.0
+ @Deprecated
@JsonProperty(COMMIT_TYPE_KEY)
public boolean isSplitCommit() {
- return _splitCommit;
- }
-
- @JsonProperty(COMMIT_TYPE_KEY)
- public void setSplitCommit(boolean splitCommit) {
- _splitCommit = splitCommit;
+ return true;
}
@JsonProperty(CONTROLLER_VIP_URL_KEY)
@@ -556,18 +536,14 @@ public class SegmentCompletionProtocol {
public static class Params {
private ControllerResponseStatus _status;
- private long _offset;
private long _buildTimeSeconds;
- private boolean _splitCommit;
private String _segmentLocation;
private String _controllerVipUrl;
private String _streamPartitionMsgOffset;
public Params() {
- _offset = -1L;
_status = ControllerResponseStatus.FAILED;
_buildTimeSeconds = -1;
- _splitCommit = false;
_segmentLocation = null;
_controllerVipUrl = null;
_streamPartitionMsgOffset = null;
@@ -583,11 +559,6 @@ public class SegmentCompletionProtocol {
return this;
}
- public Params withSplitCommit(boolean splitCommit) {
- _splitCommit = splitCommit;
- return this;
- }
-
public Params withControllerVipUrl(String controllerVipUrl) {
_controllerVipUrl = controllerVipUrl;
return this;
@@ -598,14 +569,8 @@ public class SegmentCompletionProtocol {
return this;
}
- public Params withStreamPartitionMsgOffset(String offset) {
- _streamPartitionMsgOffset = offset;
- // TODO Issue 5359 Remove the block below once we have both parties be
fine without _offset being present.
- try {
- _offset = Long.parseLong(_streamPartitionMsgOffset);
- } catch (Exception e) {
- // Ignore. If the receiver expects _offset, it will return an error
to the sender.
- }
+ public Params withStreamPartitionMsgOffset(String
streamPartitionMsgOffset) {
+ _streamPartitionMsgOffset = streamPartitionMsgOffset;
return this;
}
@@ -613,19 +578,10 @@ public class SegmentCompletionProtocol {
return _status;
}
- @Deprecated
- private long getOffset() {
- return _offset;
- }
-
public long getBuildTimeSeconds() {
return _buildTimeSeconds;
}
- public boolean isSplitCommit() {
- return _splitCommit;
- }
-
public String getSegmentLocation() {
return _segmentLocation;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 22a838faf1..fa6853191f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -264,7 +264,6 @@ public class ControllerConf extends PinotConfiguration {
private static final String SEGMENT_COMMIT_TIMEOUT_SECONDS =
"controller.realtime.segment.commit.timeoutSeconds";
private static final String DELETED_SEGMENTS_RETENTION_IN_DAYS =
"controller.deleted.segments.retentionInDays";
public static final String TABLE_MIN_REPLICAS = "table.minReplicas";
- public static final String ENABLE_SPLIT_COMMIT =
"controller.enable.split.commit";
private static final String JERSEY_ADMIN_API_PORT = "jersey.admin.api.port";
private static final String JERSEY_ADMIN_IS_PRIMARY =
"jersey.admin.isprimary";
public static final String ACCESS_CONTROL_FACTORY_CLASS =
"controller.admin.access.control.factory.class";
@@ -278,11 +277,6 @@ public class ControllerConf extends PinotConfiguration {
"controller.realtime.segment.metadata.commit.numLocks";
private static final String ENABLE_STORAGE_QUOTA_CHECK =
"controller.enable.storage.quota.check";
private static final String ENABLE_BATCH_MESSAGE_MODE =
"controller.enable.batch.message.mode";
- // It is used to disable the HLC realtime segment completion and disallow
HLC table in the cluster. True by default.
- // If it's set to false, existing HLC realtime tables will stop consumption,
and creation of new HLC tables will be
- // disallowed.
- // Please make sure there is no HLC table running in the cluster before
disallowing it.
- public static final String ALLOW_HLC_TABLES = "controller.allow.hlc.tables";
public static final String DIM_TABLE_MAX_SIZE =
"controller.dimTable.maxSize";
// Defines the kind of storage and the underlying PinotFS implementation
@@ -292,7 +286,6 @@ public class ControllerConf extends PinotConfiguration {
private static final int DEFAULT_MINION_ADMIN_REQUEST_TIMEOUT_SECONDS = 30;
private static final int DEFAULT_DELETED_SEGMENTS_RETENTION_IN_DAYS = 7;
private static final int DEFAULT_TABLE_MIN_REPLICAS = 1;
- private static final boolean DEFAULT_ENABLE_SPLIT_COMMIT = true;
private static final int DEFAULT_JERSEY_ADMIN_PORT = 21000;
private static final String DEFAULT_ACCESS_CONTROL_FACTORY_CLASS =
"org.apache.pinot.controller.api.access.AllowAllAccessFactory";
@@ -346,10 +339,6 @@ public class ControllerConf extends PinotConfiguration {
}
}
- public void setSplitCommit(boolean isSplitCommit) {
- setProperty(ENABLE_SPLIT_COMMIT, isSplitCommit);
- }
-
public void setQueryConsolePath(String path) {
setProperty(CONSOLE_WEBAPP_ROOT_PATH, path);
}
@@ -490,10 +479,6 @@ public class ControllerConf extends PinotConfiguration {
return super.toString();
}
- public boolean getAcceptSplitCommit() {
- return getProperty(ENABLE_SPLIT_COMMIT, DEFAULT_ENABLE_SPLIT_COMMIT);
- }
-
public String getControllerVipHost() {
return Optional.ofNullable(getProperty(CONTROLLER_VIP_HOST))
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index 836f58052d..93fd7b1896 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -343,8 +343,8 @@ public class LLCSegmentCompletionHandlers {
}
response = segmentCompletionManager.segmentCommitEnd(requestParams,
success, false, committingSegmentDescriptor);
- LOGGER.info("Response to segmentCommit: instance={} segment={} status={}
offset={}, streamMsgOffset={}",
- requestParams.getInstanceId(), requestParams.getSegmentName(),
response.getStatus(), response.getOffset(),
+ LOGGER.info("Response to segmentCommit: instance={}, segment={},
status={}, streamMsgOffset={}",
+ requestParams.getInstanceId(), requestParams.getSegmentName(),
response.getStatus(),
response.getStreamPartitionMsgOffset());
return response.toJsonString();
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index a1f49884c0..8611c251aa 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -209,10 +209,6 @@ public class PinotLLCRealtimeSegmentManager {
return new FileUploadDownloadClient();
}
- public boolean getIsSplitCommitEnabled() {
- return _controllerConf.getAcceptSplitCommit();
- }
-
/**
* Using the ideal state and segment metadata, return a list of {@link
PartitionGroupConsumptionStatus}
* for latest segment of each partition group.
@@ -1471,7 +1467,7 @@ public class PinotLLCRealtimeSegmentManager {
return 0L;
}
- if (!getIsSplitCommitEnabled() || !isTmpSegmentAsyncDeletionEnabled()) {
+ if (!isTmpSegmentAsyncDeletionEnabled()) {
return 0L;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 11134f342f..baab4b6ab7 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -106,10 +106,6 @@ public class SegmentCompletionManager {
}
}
- public boolean isSplitCommitEnabled() {
- return _segmentManager.getIsSplitCommitEnabled();
- }
-
public String getControllerVipUrl() {
return _segmentManager.getControllerVipUrl();
}
@@ -391,7 +387,6 @@ public class SegmentCompletionManager {
// We may need to add some time here to allow for getting the lock? For
now 0
// We may need to add some time for the committer come back to us (after
the build)? For now 0.
private long _maxTimeAllowedToCommitMs;
- private final boolean _isSplitCommitEnabled;
private final String _controllerVipUrl;
public static SegmentCompletionFSM
fsmInHolding(PinotLLCRealtimeSegmentManager segmentManager,
@@ -445,7 +440,6 @@ public class SegmentCompletionManager {
}
_initialCommitTimeMs = initialCommitTimeMs;
_maxTimeAllowedToCommitMs = _startTimeMs + _initialCommitTimeMs;
- _isSplitCommitEnabled = segmentCompletionManager.isSplitCommitEnabled();
_controllerVipUrl = segmentCompletionManager.getControllerVipUrl();
}
@@ -463,7 +457,7 @@ public class SegmentCompletionManager {
@Override
public String toString() {
return "{" + _segmentName.getSegmentName() + "," + _state + "," +
_startTimeMs + "," + _winner + ","
- + _winningOffset + "," + _isSplitCommitEnabled + "," +
_controllerVipUrl + "}";
+ + _winningOffset + "," + _controllerVipUrl + "}";
}
// SegmentCompletionManager releases the FSM from the hashtable when it is
done.
@@ -680,10 +674,7 @@ public class SegmentCompletionManager {
new
SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(offset.toString())
.withBuildTimeSeconds(allowedBuildTimeSec)
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT)
- .withSplitCommit(_isSplitCommitEnabled);
- if (_isSplitCommitEnabled) {
- params.withControllerVipUrl(_controllerVipUrl);
- }
+ .withControllerVipUrl(_controllerVipUrl);
return new SegmentCompletionProtocol.Response(params);
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionProtocolDeserTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionProtocolDeserTest.java
index 3420d069f8..b7ebe18058 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionProtocolDeserTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/SegmentCompletionProtocolDeserTest.java
@@ -26,7 +26,6 @@ import org.apache.pinot.spi.utils.JsonUtils;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@@ -42,14 +41,13 @@ public class SegmentCompletionProtocolDeserTest {
// Test with all params
SegmentCompletionProtocol.Response.Params params =
new
SegmentCompletionProtocol.Response.Params().withBuildTimeSeconds(BUILD_TIME_MILLIS)
-
.withStreamPartitionMsgOffset(OFFSET.toString()).withSegmentLocation(SEGMENT_LOCATION).withSplitCommit(true)
+
.withStreamPartitionMsgOffset(OFFSET.toString()).withSegmentLocation(SEGMENT_LOCATION)
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
SegmentCompletionProtocol.Response response = new
SegmentCompletionProtocol.Response(params);
assertEquals(response.getBuildTimeSeconds(), BUILD_TIME_MILLIS);
assertEquals(new
LongMsgOffset(response.getStreamPartitionMsgOffset()).compareTo(OFFSET), 0);
assertEquals(response.getSegmentLocation(), SEGMENT_LOCATION);
- assertTrue(response.isSplitCommit());
assertEquals(response.getStatus(),
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
}
@@ -65,7 +63,6 @@ public class SegmentCompletionProtocolDeserTest {
assertEquals(response.getBuildTimeSeconds(), BUILD_TIME_MILLIS);
assertEquals(new
LongMsgOffset(response.getStreamPartitionMsgOffset()).compareTo(OFFSET), 0);
assertNull(response.getSegmentLocation());
- assertFalse(response.isSplitCommit());
assertEquals(response.getStatus(),
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
}
@@ -74,14 +71,14 @@ public class SegmentCompletionProtocolDeserTest {
// Test with all params
SegmentCompletionProtocol.Response.Params params =
new
SegmentCompletionProtocol.Response.Params().withBuildTimeSeconds(BUILD_TIME_MILLIS)
-
.withStreamPartitionMsgOffset(OFFSET.toString()).withSegmentLocation(SEGMENT_LOCATION).withSplitCommit(true)
+
.withStreamPartitionMsgOffset(OFFSET.toString()).withSegmentLocation(SEGMENT_LOCATION)
.withControllerVipUrl(CONTROLLER_VIP_URL)
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
SegmentCompletionProtocol.Response response = new
SegmentCompletionProtocol.Response(params);
JsonNode jsonNode = JsonUtils.objectToJsonNode(response);
- assertEquals(jsonNode.get("offset").asText(), OFFSET.toString());
+ assertEquals(jsonNode.get("streamPartitionMsgOffset").asText(),
OFFSET.toString());
assertEquals(jsonNode.get("segmentLocation").asText(), SEGMENT_LOCATION);
assertTrue(jsonNode.get("isSplitCommitType").asBoolean());
assertEquals(jsonNode.get("status").asText(),
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT.toString());
@@ -92,32 +89,15 @@ public class SegmentCompletionProtocolDeserTest {
public void testJsonNullSegmentLocationAndVip() {
SegmentCompletionProtocol.Response.Params params =
new
SegmentCompletionProtocol.Response.Params().withBuildTimeSeconds(BUILD_TIME_MILLIS)
-
.withStreamPartitionMsgOffset(OFFSET.toString()).withSplitCommit(false)
-
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
-
- SegmentCompletionProtocol.Response response = new
SegmentCompletionProtocol.Response(params);
- JsonNode jsonNode = JsonUtils.objectToJsonNode(response);
-
- assertEquals(jsonNode.get("offset").asText(), OFFSET.toString());
- assertNull(jsonNode.get("segmentLocation"));
- assertFalse(jsonNode.get("isSplitCommitType").asBoolean());
- assertEquals(jsonNode.get("status").asText(),
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT.toString());
- assertNull(jsonNode.get("controllerVipUrl"));
- }
-
- @Test
- public void testJsonResponseWithoutSplitCommit() {
- SegmentCompletionProtocol.Response.Params params =
- new
SegmentCompletionProtocol.Response.Params().withBuildTimeSeconds(BUILD_TIME_MILLIS)
-
.withStreamPartitionMsgOffset(OFFSET.toString()).withSplitCommit(false)
+ .withStreamPartitionMsgOffset(OFFSET.toString())
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
SegmentCompletionProtocol.Response response = new
SegmentCompletionProtocol.Response(params);
JsonNode jsonNode = JsonUtils.objectToJsonNode(response);
- assertEquals(jsonNode.get("offset").asText(), OFFSET.toString());
+ assertEquals(jsonNode.get("streamPartitionMsgOffset").asText(),
OFFSET.toString());
assertNull(jsonNode.get("segmentLocation"));
- assertFalse(jsonNode.get("isSplitCommitType").asBoolean());
+ assertTrue(jsonNode.get("isSplitCommitType").asBoolean());
assertEquals(jsonNode.get("status").asText(),
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT.toString());
assertNull(jsonNode.get("controllerVipUrl"));
}
@@ -129,14 +109,14 @@ public class SegmentCompletionProtocolDeserTest {
SegmentCompletionProtocol.Response.Params params =
new
SegmentCompletionProtocol.Response.Params().withBuildTimeSeconds(BUILD_TIME_MILLIS)
.withStreamPartitionMsgOffset(OFFSET.toString()).withSegmentLocation(SEGMENT_LOCATION)
-
.withSplitCommit(false).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
+
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
SegmentCompletionProtocol.Response response = new
SegmentCompletionProtocol.Response(params);
JsonNode jsonNode = JsonUtils.objectToJsonNode(response);
- assertEquals(jsonNode.get("offset").asText(), OFFSET.toString());
+ assertEquals(jsonNode.get("streamPartitionMsgOffset").asText(),
OFFSET.toString());
assertEquals(jsonNode.get("segmentLocation").asText(), SEGMENT_LOCATION);
- assertFalse(jsonNode.get("isSplitCommitType").asBoolean());
+ assertTrue(jsonNode.get("isSplitCommitType").asBoolean());
assertEquals(jsonNode.get("status").asText(),
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT.toString());
assertNull(jsonNode.get("controllerVipUrl"));
}
@@ -148,14 +128,14 @@ public class SegmentCompletionProtocolDeserTest {
SegmentCompletionProtocol.Response.Params params =
new
SegmentCompletionProtocol.Response.Params().withBuildTimeSeconds(BUILD_TIME_MILLIS)
.withStreamPartitionMsgOffset(OFFSET.toString()).withControllerVipUrl(CONTROLLER_VIP_URL)
-
.withSplitCommit(false).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
+
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
SegmentCompletionProtocol.Response response = new
SegmentCompletionProtocol.Response(params);
JsonNode jsonNode = JsonUtils.objectToJsonNode(response);
- assertEquals(jsonNode.get("offset").asText(), OFFSET.toString());
+ assertEquals(jsonNode.get("streamPartitionMsgOffset").asText(),
OFFSET.toString());
assertNull(jsonNode.get("segmentLocation"));
- assertFalse(jsonNode.get("isSplitCommitType").asBoolean());
+ assertTrue(jsonNode.get("isSplitCommitType").asBoolean());
assertEquals(jsonNode.get("status").asText(),
SegmentCompletionProtocol.ControllerResponseStatus.COMMIT.toString());
assertEquals(jsonNode.get("controllerVipUrl").asText(),
CONTROLLER_VIP_URL);
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 9a53ad839e..ffc98692a0 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -88,7 +88,6 @@ import org.testng.annotations.Test;
import static
org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.ENABLE_TMP_SEGMENT_ASYNC_DELETION;
import static
org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.TMP_SEGMENT_RETENTION_IN_SECONDS;
-import static org.apache.pinot.controller.ControllerConf.ENABLE_SPLIT_COMMIT;
import static
org.apache.pinot.spi.utils.CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
@@ -1060,7 +1059,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
// turn on knobs for async deletion of tmp files
ControllerConf config = new ControllerConf();
config.setDataDir(TEMP_DIR.toString());
- config.setProperty(ENABLE_SPLIT_COMMIT, true);
config.setProperty(TMP_SEGMENT_RETENTION_IN_SECONDS, Integer.MIN_VALUE);
config.setProperty(ENABLE_TMP_SEGMENT_ASYNC_DELETION, true);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index ffa16052ee..42e79e67dc 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -757,8 +757,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
_state = State.ERROR;
_segmentLogger.error("Could not build segment for {}",
_segmentNameStr);
} else {
- success = commitSegment(response.getControllerVipUrl(),
- response.isSplitCommit() &&
_indexLoadingConfig.isEnableSplitCommit());
+ success = commitSegment(response.getControllerVipUrl(),
_indexLoadingConfig.isEnableSplitCommit());
if (success) {
_state = State.COMMITTED;
} else {
@@ -808,12 +807,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
@VisibleForTesting
protected StreamPartitionMsgOffset
extractOffset(SegmentCompletionProtocol.Response response) {
- if (response.getStreamPartitionMsgOffset() != null) {
- return
_streamPartitionMsgOffsetFactory.create(response.getStreamPartitionMsgOffset());
- } else {
- // TODO Issue 5359 Remove this once the protocol is upgraded on server
and controller
- return
_streamPartitionMsgOffsetFactory.create(Long.toString(response.getOffset()));
- }
+ return
_streamPartitionMsgOffsetFactory.create(response.getStreamPartitionMsgOffset());
}
// Side effect: Modifies _segmentBuildDescriptor if we do not have a valid
built segment file and we
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 18b2d707b1..333e1a8604 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -164,42 +164,6 @@ public class LLRealtimeSegmentDataManagerTest {
SegmentBuildTimeLeaseExtender.shutdownExecutor();
}
- @Test
- public void testOffsetParsing()
- throws Exception {
- final String offset = "34";
- FakeLLRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager();
- {
- // Controller sends catchup response with both offset and
streamPartitionMsgOffset
- String responseStr =
- "{" + " \"streamPartitionMsgOffset\" : \"" + offset + "\"," + "
\"offset\" : " + offset + ","
- + " \"buildTimeSec\" : -1," + " \"isSplitCommitType\" : false,"
- + " \"segmentLocation\" : \"file:///a/b\"," + " \"status\" :
\"CATCH_UP\"" + "}";
- SegmentCompletionProtocol.Response response =
SegmentCompletionProtocol.Response.fromJsonString(responseStr);
- StreamPartitionMsgOffset extractedOffset =
segmentDataManager.extractOffset(response);
- Assert.assertEquals(extractedOffset.compareTo(new
LongMsgOffset(offset)), 0);
- }
- {
- // Controller sends catchup response with offset only
- String responseStr =
- "{" + " \"offset\" : " + offset + "," + " \"buildTimeSec\" : -1,"
+ " \"isSplitCommitType\" : false,"
- + " \"segmentLocation\" : \"file:///a/b\"," + " \"status\" :
\"CATCH_UP\"" + "}";
- SegmentCompletionProtocol.Response response =
SegmentCompletionProtocol.Response.fromJsonString(responseStr);
- StreamPartitionMsgOffset extractedOffset =
segmentDataManager.extractOffset(response);
- Assert.assertEquals(extractedOffset.compareTo(new
LongMsgOffset(offset)), 0);
- }
- {
- // Controller sends catchup response streamPartitionMsgOffset only
- String responseStr = "{" + " \"streamPartitionMsgOffset\" : \"" +
offset + "\"," + " \"buildTimeSec\" : -1,"
- + " \"isSplitCommitType\" : false," + " \"segmentLocation\" :
\"file:///a/b\","
- + " \"status\" : \"CATCH_UP\"" + "}";
- SegmentCompletionProtocol.Response response =
SegmentCompletionProtocol.Response.fromJsonString(responseStr);
- StreamPartitionMsgOffset extractedOffset =
segmentDataManager.extractOffset(response);
- Assert.assertEquals(extractedOffset.compareTo(new
LongMsgOffset(offset)), 0);
- }
- segmentDataManager.destroy();
- }
-
// Test that we are in HOLDING state as long as the controller responds HOLD
to our segmentConsumed() message.
// we should not consume when holding.
@Test
@@ -1026,8 +990,7 @@ public class LLRealtimeSegmentDataManagerTest {
}
public PartitionConsumer createPartitionConsumer() {
- PartitionConsumer consumer = new PartitionConsumer();
- return consumer;
+ return new PartitionConsumer();
}
public SegmentBuildDescriptor invokeBuildForCommit(long leaseTime) {
@@ -1036,9 +999,7 @@ public class LLRealtimeSegmentDataManagerTest {
}
public boolean invokeCommit() {
- SegmentCompletionProtocol.Response response =
mock(SegmentCompletionProtocol.Response.class);
- when(response.isSplitCommit()).thenReturn(false);
- return super.commitSegment(response.getControllerVipUrl(), false);
+ return super.commitSegment("dummyUrl", false);
}
private void terminateLoopIfNecessary() {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
index c899c07998..21711d08ea 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
@@ -20,9 +20,6 @@ package org.apache.pinot.integration.tests;
import java.io.File;
import java.util.List;
-import java.util.Map;
-import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.spi.utils.ReadMode;
public class ExactlyOnceKafkaRealtimeClusterIntegrationTest extends
BaseRealtimeClusterIntegrationTest {
@@ -32,20 +29,6 @@ public class ExactlyOnceKafkaRealtimeClusterIntegrationTest
extends BaseRealtime
return true;
}
- @Override
- protected String getLoadMode() {
- return ReadMode.mmap.name();
- }
-
- @Override
- public void startController()
- throws Exception {
- Map<String, Object> properties = getDefaultControllerConfiguration();
-
- properties.put(ControllerConf.ALLOW_HLC_TABLES, false);
- startController(properties);
- }
-
@Override
protected void pushAvroIntoKafka(List<File> avroFiles)
throws Exception {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java
index fd48bd6eb4..5553ce2a5b 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java
@@ -41,7 +41,6 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
-import org.apache.pinot.controller.ControllerConf;
import
org.apache.pinot.integration.tests.kafka.schemaregistry.SchemaRegistryStarter;
import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
import
org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder;
@@ -193,12 +192,7 @@ public class
KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegr
@Override
public void startController()
throws Exception {
- Map<String, Object> properties = getDefaultControllerConfiguration();
-
- properties.put(ControllerConf.ALLOW_HLC_TABLES, false);
- properties.put(ControllerConf.ENABLE_SPLIT_COMMIT, _enableSplitCommit);
-
- startController(properties);
+ super.startController();
enableResourceConfigForLeadControllerResource(_enableLeadControllerResource);
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 88fe46e4e1..f71fc3a2c8 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -39,7 +39,6 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.HashUtil;
-import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -68,7 +67,6 @@ import static org.testng.Assert.assertTrue;
*/
public class LLCRealtimeClusterIntegrationTest extends
BaseRealtimeClusterIntegrationTest {
private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test";
- private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS =
Collections.singletonList("DivActualElapsedTime");
private static final long RANDOM_SEED = System.currentTimeMillis();
private static final Random RANDOM = new Random(RANDOM_SEED);
@@ -91,12 +89,7 @@ public class LLCRealtimeClusterIntegrationTest extends
BaseRealtimeClusterIntegr
@Override
public void startController()
throws Exception {
- Map<String, Object> properties = getDefaultControllerConfiguration();
-
- properties.put(ControllerConf.ALLOW_HLC_TABLES, false);
- properties.put(ControllerConf.ENABLE_SPLIT_COMMIT, _enableSplitCommit);
-
- startController(properties);
+ super.startController();
enableResourceConfigForLeadControllerResource(_enableLeadControllerResource);
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java
index 964a854f98..f264bb367c 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java
@@ -53,8 +53,6 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.apache.pinot.controller.ControllerConf.ALLOW_HLC_TABLES;
-import static org.apache.pinot.controller.ControllerConf.ENABLE_SPLIT_COMMIT;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@@ -139,8 +137,6 @@ public class PeerDownloadLLCRealtimeClusterIntegrationTest
extends BaseRealtimeC
public void startController()
throws Exception {
Map<String, Object> controllerConfig = getDefaultControllerConfiguration();
- controllerConfig.put(ALLOW_HLC_TABLES, false);
- controllerConfig.put(ENABLE_SPLIT_COMMIT, _enableSplitCommit);
// Override the data dir config.
controllerConfig.put(ControllerConf.DATA_DIR, "mockfs://" +
getHelixClusterName());
controllerConfig.put(ControllerConf.LOCAL_TEMP_DIR,
FileUtils.getTempDirectory().getAbsolutePath());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]