This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2f89971 Interface changes for Kinesis connector (#6667)
2f89971 is described below
commit 2f8997198a7dd8037dc2c0165eb567c9c0a98c4b
Author: Neha Pawar <[email protected]>
AuthorDate: Tue Apr 6 17:58:17 2021 -0700
Interface changes for Kinesis connector (#6667)
Part 2 of Kinesis implementation changes. This #6518 is the main PR, which
is being split up into 3 parts in order to reduce the scope of review.
In this PR, An abstraction has been introduced for a group of
partition/shards, and each consumer will now be responsible for a
PartitionGroup instead of just a partitionId (though in first iteration, the
PartitionGroup will only contain 1 partition/shard). It includes corresponding
additions to interfaces and some new interfaces.
Kafka stream should function as is, with no changes needed.
Note that before upgrading to this change, installations should upgrade to
at least the release that has #5359
(https://docs.pinot.apache.org/basics/releases/0.5.0). An installation that has
older versions than this may break during upgrade.
---
.../segment/LLCRealtimeSegmentZKMetadata.java | 5 -
.../protocols/SegmentCompletionProtocol.java | 66 +++--
.../org/apache/pinot/common/utils/URIUtils.java | 26 ++
.../protocols/SegmentCompletionProtocolTest.java | 135 ++++++++++
.../apache/pinot/common/utils/URIUtilsTest.java | 28 ++
.../helix/core/PinotTableIdealStateBuilder.java | 51 +++-
.../realtime/PinotLLCRealtimeSegmentManager.java | 291 +++++++++++++--------
.../core/realtime/SegmentCompletionManager.java | 4 +-
.../SegmentSizeBasedFlushThresholdUpdater.java | 1 +
.../PinotLLCRealtimeSegmentManagerTest.java | 205 +++++++++++----
.../realtime/LLRealtimeSegmentDataManager.java | 54 ++--
.../realtime/LLRealtimeSegmentDataManagerTest.java | 2 +
.../org/apache/pinot/spi/stream/LongMsgOffset.java | 5 +
.../org/apache/pinot/spi/stream/MessageBatch.java | 7 +
.../pinot/spi/stream/PartitionGroupConsumer.java | 43 +++
.../stream/PartitionGroupConsumptionStatus.java | 86 ++++++
...gMsgOffset.java => PartitionGroupMetadata.java} | 44 ++--
...her.java => PartitionGroupMetadataFetcher.java} | 37 +--
.../pinot/spi/stream/PartitionLevelConsumer.java | 2 +-
.../pinot/spi/stream/PartitionOffsetFetcher.java | 87 ------
.../pinot/spi/stream/StreamConsumerFactory.java | 10 +-
.../pinot/spi/stream/StreamMetadataProvider.java | 47 +++-
.../pinot/spi/stream/StreamPartitionMsgOffset.java | 12 +-
23 files changed, 899 insertions(+), 349 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/LLCRealtimeSegmentZKMetadata.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/LLCRealtimeSegmentZKMetadata.java
index b8b8d95..7cb19a7 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/LLCRealtimeSegmentZKMetadata.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/LLCRealtimeSegmentZKMetadata.java
@@ -87,11 +87,6 @@ public class LLCRealtimeSegmentZKMetadata extends
RealtimeSegmentZKMetadata {
public ZNRecord toZNRecord() {
ZNRecord znRecord = super.toZNRecord();
znRecord.setSimpleField(START_OFFSET, _startOffset);
- if (_endOffset == null) {
- // TODO Issue 5359 Keep this until all components have upgraded to a
version that can handle _offset being null
- // For backward compatibility until all components have been upgraded to
deal with null value for _endOffset
- _endOffset = Long.toString(Long.MAX_VALUE);
- }
znRecord.setSimpleField(END_OFFSET, _endOffset);
znRecord.setIntField(NUM_REPLICAS, _numReplicas);
znRecord.setSimpleField(DOWNLOAD_URL, _downloadUrl);
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index 04f300b..3fa2c2c 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -19,12 +19,14 @@
package org.apache.pinot.common.protocols;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -135,6 +137,7 @@ public class SegmentCompletionProtocol {
public static final String REASON_ROW_LIMIT = "rowLimit"; // Stop reason
sent by server as max num rows reached
public static final String REASON_TIME_LIMIT = "timeLimit"; // Stop reason
sent by server as max time reached
+ public static final String REASON_END_OF_PARTITION_GROUP =
"endOfPartitionGroup"; // Stop reason sent by server as end of partitionGroup
reached
// Canned responses
public static final Response RESP_NOT_LEADER =
@@ -180,20 +183,39 @@ public class SegmentCompletionProtocol {
}
public String getUrl(String hostPort, String protocol) {
- return protocol + "://" + hostPort + "/" + _msgType + "?" +
PARAM_SEGMENT_NAME + "=" + _params.getSegmentName()
- + "&" + PARAM_OFFSET + "=" + _params.getOffset() + "&" +
PARAM_INSTANCE_ID + "=" + _params.getInstanceId() + (
- _params.getReason() == null ? "" : ("&" + PARAM_REASON + "=" +
_params.getReason())) + (
- _params.getBuildTimeMillis() <= 0 ? "" : ("&" +
PARAM_BUILD_TIME_MILLIS + "=" + _params.getBuildTimeMillis()))
- + (_params.getWaitTimeMillis() <= 0 ? "" : ("&" +
PARAM_WAIT_TIME_MILLIS + "=" + _params.getWaitTimeMillis()))
- + (_params.getExtraTimeSec() <= 0 ? "" : ("&" + PARAM_EXTRA_TIME_SEC
+ "=" + _params.getExtraTimeSec())) + (
- _params.getMemoryUsedBytes() <= 0 ? "" : ("&" +
PARAM_MEMORY_USED_BYTES + "=" + _params.getMemoryUsedBytes()))
- + (_params.getSegmentSizeBytes() <= 0 ? ""
- : ("&" + PARAM_SEGMENT_SIZE_BYTES + "=" +
_params.getSegmentSizeBytes())) + (_params.getNumRows() <= 0 ? ""
- : ("&" + PARAM_ROW_COUNT + "=" + _params.getNumRows())) +
(_params.getSegmentLocation() == null ? ""
- : ("&" + PARAM_SEGMENT_LOCATION + "=" +
_params.getSegmentLocation()))
- + (_params.getStreamPartitionMsgOffset() == null ? ""
- : ("&" + PARAM_STREAM_PARTITION_MSG_OFFSET + "=" +
_params.getStreamPartitionMsgOffset()))
- ;
+
+ Map<String, String> params = new HashMap<>();
+ params.put(PARAM_SEGMENT_NAME, _params.getSegmentName());
+ params.put(PARAM_OFFSET, String.valueOf(_params.getOffset()));
+ params.put(PARAM_INSTANCE_ID, _params.getInstanceId());
+ if (_params.getReason() != null) {
+ params.put(PARAM_REASON, _params.getReason());
+ }
+ if ( _params.getBuildTimeMillis() > 0) {
+ params.put(PARAM_BUILD_TIME_MILLIS,
String.valueOf(_params.getBuildTimeMillis()));
+ }
+ if ( _params.getWaitTimeMillis() > 0) {
+ params.put(PARAM_WAIT_TIME_MILLIS,
String.valueOf(_params.getWaitTimeMillis()));
+ }
+ if ( _params.getExtraTimeSec() > 0) {
+ params.put(PARAM_EXTRA_TIME_SEC,
String.valueOf(_params.getExtraTimeSec()));
+ }
+ if ( _params.getMemoryUsedBytes() > 0) {
+ params.put(PARAM_MEMORY_USED_BYTES,
String.valueOf(_params.getMemoryUsedBytes()));
+ }
+ if ( _params.getSegmentSizeBytes() > 0) {
+ params.put(PARAM_SEGMENT_SIZE_BYTES,
String.valueOf(_params.getSegmentSizeBytes()));
+ }
+ if ( _params.getNumRows() > 0) {
+ params.put(PARAM_ROW_COUNT, String.valueOf(_params.getNumRows()));
+ }
+ if (_params.getSegmentLocation() != null) {
+ params.put(PARAM_SEGMENT_LOCATION, _params.getSegmentLocation());
+ }
+ if (_params.getStreamPartitionMsgOffset() != null) {
+ params.put(PARAM_STREAM_PARTITION_MSG_OFFSET,
_params.getStreamPartitionMsgOffset());
+ }
+ return URIUtils.buildURI(protocol, hostPort, _msgType,
params).toString();
}
public static class Params {
@@ -296,13 +318,6 @@ public class SegmentCompletionProtocol {
public Params withStreamPartitionMsgOffset(String offset) {
_streamPartitionMsgOffset = offset;
- // Try to populate the offset if possible.
- // TODO Issue 5359 Remove this code once we have both sides be able to
live without _offset.
- try {
- _offset = Long.parseLong(_streamPartitionMsgOffset);
- } catch (Exception e) {
- // Ignore, if the recipient excepts _offset, it will return an error
to the sender.
- }
return this;
}
@@ -352,12 +367,7 @@ public class SegmentCompletionProtocol {
}
public String getStreamPartitionMsgOffset() {
- if (_streamPartitionMsgOffset != null) {
- return _streamPartitionMsgOffset;
- } else {
- // TODO 5359 remove this once we are all upgraded in controllers and
servers.
- return Long.toString(_offset);
- }
+ return _streamPartitionMsgOffset;
}
public String toString() {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java
index b5f0aeb..d39c3b6 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java
@@ -23,7 +23,9 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.net.URLEncoder;
+import java.util.Map;
import java.util.StringJoiner;
+import org.apache.http.client.utils.URIBuilder;
public class URIUtils {
@@ -85,4 +87,28 @@ public class URIUtils {
throw new RuntimeException(e);
}
}
+
+ /**
+ * Builds the URI using the schema, host, port, path and map of params.
+ * The URI builder automatically encodes fields as needed
+ */
+ public static URI buildURI(String schema, String hostPort, String path,
Map<String, String> params) {
+ URIBuilder uriBuilder = new
URIBuilder().setScheme(schema).setHost(hostPort).setPath(path);
+ for (Map.Entry<String, String> entry : params.entrySet()) {
+ uriBuilder.addParameter(entry.getKey(), entry.getValue());
+ }
+ try {
+ return uriBuilder.build();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Builds the URI using the schema, host, port, path and map of params.
+ * The URI builder automatically encodes fields as needed
+ */
+ public static URI buildURI(String schema, String host, int port, String
path, Map<String, String> params) {
+ return buildURI(schema, String.format("%s:%d", host, port), path, params);
+ }
}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/protocols/SegmentCompletionProtocolTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/protocols/SegmentCompletionProtocolTest.java
new file mode 100644
index 0000000..2f5edb8
--- /dev/null
+++
b/pinot-common/src/test/java/org/apache/pinot/common/protocols/SegmentCompletionProtocolTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.protocols;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.utils.URIUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class SegmentCompletionProtocolTest {
+
+ @Test
+ public void testRequestURL()
+ throws Exception {
+
+ // test default params
+ SegmentCompletionProtocol.Request.Params params = new
SegmentCompletionProtocol.Request.Params();
+ SegmentCompletionProtocol.ExtendBuildTimeRequest extendBuildTimeRequest =
+ new SegmentCompletionProtocol.ExtendBuildTimeRequest(params);
+ URI uri = new URI(extendBuildTimeRequest.getUrl("localhost:8080", "http"));
+ Assert.assertEquals(uri.getScheme(), "http");
+ Assert.assertEquals(uri.getHost(), "localhost");
+ Assert.assertEquals(uri.getPort(), 8080);
+ Assert.assertEquals(uri.getPath(), String.format("/%s",
SegmentCompletionProtocol.MSG_TYPE_EXTEND_BUILD_TIME));
+ Map<String, String> paramsMap =
+ Arrays.stream(uri.getQuery().split("&")).collect(Collectors.toMap(e ->
e.split("=")[0], e -> e.split("=")[1]));
+
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_SEGMENT_NAME),
"UNKNOWN_SEGMENT");
+ Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_OFFSET),
"-1");
+
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_INSTANCE_ID),
"UNKNOWN_INSTANCE");
+ Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_REASON));
+
Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS));
+
Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS));
+
Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_EXTRA_TIME_SEC));
+
Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES));
+
Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES));
+
Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_ROW_COUNT));
+
Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION));
+
Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET));
+
+ // test that params set only if valid values
+ params = new
SegmentCompletionProtocol.Request.Params().withSegmentName("foo__0__0__12345Z")
+
.withInstanceId("Server_localhost_8099").withReason(null).withBuildTimeMillis(-100).withWaitTimeMillis(0)
+
.withExtraTimeSec(-1).withMemoryUsedBytes(0).withSegmentSizeBytes(-12345).withNumRows(0)
+ .withSegmentLocation(null).withStreamPartitionMsgOffset(null);
+ SegmentCompletionProtocol.SegmentConsumedRequest segmentConsumedRequest =
+ new SegmentCompletionProtocol.SegmentConsumedRequest(params);
+ uri = new URI(segmentConsumedRequest.getUrl("localhost:8080", "http"));
+ paramsMap =
+ Arrays.stream(uri.getQuery().split("&")).collect(Collectors.toMap(e ->
e.split("=")[0], e -> e.split("=")[1]));
+
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_SEGMENT_NAME),
"foo__0__0__12345Z");
+ Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_OFFSET),
"-1");
+
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_INSTANCE_ID),
"Server_localhost_8099");
+ Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_REASON));
+
Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS));
+
Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS));
+
Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_EXTRA_TIME_SEC));
+
Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES));
+
Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES));
+
Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_ROW_COUNT));
+
Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION));
+
Assert.assertNull(paramsMap.get(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET));
+
+ params = new
SegmentCompletionProtocol.Request.Params().withSegmentName("foo__0__0__12345Z")
+
.withInstanceId("Server_localhost_8099").withReason("ROW_LIMIT").withBuildTimeMillis(1000)
+
.withWaitTimeMillis(2000).withExtraTimeSec(3000).withMemoryUsedBytes(4000).withSegmentSizeBytes(5000)
+
.withNumRows(6000).withSegmentLocation("/tmp/segment").withStreamPartitionMsgOffset("7000");
+ SegmentCompletionProtocol.SegmentCommitRequest segmentCommitRequest =
+ new SegmentCompletionProtocol.SegmentCommitRequest(params);
+ uri = new URI(segmentCommitRequest.getUrl("localhost:8080", "http"));
+ paramsMap =
+ Arrays.stream(uri.getQuery().split("&")).collect(Collectors.toMap(e ->
e.split("=")[0], e -> e.split("=")[1]));
+
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_SEGMENT_NAME),
"foo__0__0__12345Z");
+ Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_OFFSET),
"-1");
+
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_INSTANCE_ID),
"Server_localhost_8099");
+ Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_REASON),
"ROW_LIMIT");
+
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS),
"1000");
+
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS),
"2000");
+
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_EXTRA_TIME_SEC),
"3000");
+
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES),
"4000");
+
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES),
"5000");
+
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_ROW_COUNT),
"6000");
+
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION),
"/tmp/segment");
+
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET),
"7000");
+
+ // test param encoding
+ params = new
SegmentCompletionProtocol.Request.Params().withSegmentName("foo%%__0__0__12345Z")
+
.withInstanceId("Server_localhost_8099").withReason("{\"type\":\"ROW_LIMIT\",
\"value\":1000}")
+
.withBuildTimeMillis(1000).withWaitTimeMillis(2000).withExtraTimeSec(3000).withMemoryUsedBytes(4000)
+
.withSegmentSizeBytes(5000).withNumRows(6000).withSegmentLocation("s3://my.bucket/segment")
+ .withStreamPartitionMsgOffset(
+
"{\"shardId-000000000001\":\"49615238429973311938200772279310862572716999467690098706\"}");
+ SegmentCompletionProtocol.SegmentCommitStartRequest
segmentCommitStartRequest =
+ new SegmentCompletionProtocol.SegmentCommitStartRequest(params);
+ String url = segmentCommitStartRequest.getUrl("localhost:8080", "http");
+ paramsMap = Arrays.stream(url.split("\\?")[1].split("&"))
+ .collect(Collectors.toMap(e -> e.split("=")[0], e -> e.split("=")[1]));
+
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_SEGMENT_NAME),
+ URIUtils.encode("foo%%__0__0__12345Z"));
+ Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_OFFSET),
"-1");
+
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_INSTANCE_ID),
+ URIUtils.encode("Server_localhost_8099"));
+ Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_REASON),
+ URIUtils.encode("{\"type\":\"ROW_LIMIT\", \"value\":1000}"));
+
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS),
"1000");
+
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS),
"2000");
+
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_EXTRA_TIME_SEC),
"3000");
+
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES),
"4000");
+
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES),
"5000");
+
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_ROW_COUNT),
"6000");
+
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION),
+ URIUtils.encode("s3://my.bucket/segment"));
+
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET),
+
URIUtils.encode("{\"shardId-000000000001\":\"49615238429973311938200772279310862572716999467690098706\"}"));
+ }
+}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/URIUtilsTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/URIUtilsTest.java
index 229cce0..403793a 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/URIUtilsTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/URIUtilsTest.java
@@ -18,8 +18,13 @@
*/
package org.apache.pinot.common.utils;
+import java.net.URI;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Random;
import org.apache.commons.lang3.RandomStringUtils;
+import org.testng.Assert;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@@ -65,4 +70,27 @@ public class URIUtilsTest {
assertEquals(URIUtils.decode(URIUtils.encode(randomString)),
randomString);
}
}
+
+ @Test
+ public void testBuildURI() {
+ URI uri = URIUtils.buildURI("http", "foo", "bar", Collections.emptyMap());
+ Assert.assertEquals(uri.toString(), "http://foo/bar");
+
+ uri = URIUtils.buildURI("http", "foo:8080", "bar/moo",
Collections.emptyMap());
+ Assert.assertEquals(uri.toString(), "http://foo:8080/bar/moo");
+ Assert.assertEquals(uri.getHost(), "foo");
+ Assert.assertEquals(uri.getPort(), 8080);
+
+ // test that params get encoded
+ Map<String, String> params = new HashMap<>();
+ params.put("stringParam", "aString");
+ params.put("stringParamNeedsEncoding",
"{\"format\":\"JSON\",\"timeout\":1000}");
+ uri = URIUtils.buildURI("http", "foo", "bar", params);
+ Assert.assertEquals(uri.toString(),
"http://foo/bar?stringParam=aString&stringParamNeedsEncoding=" + URIUtils
+ .encode("{\"format\":\"JSON\",\"timeout\":1000}"));
+
+ // test that path gets encoded
+ uri = URIUtils.buildURI("http", "foo", "bar%moo{}",
Collections.emptyMap());
+ Assert.assertEquals(uri.toString(), "http://foo/" +
URIUtils.encode("bar%moo{}"));
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index a564542..24a6f90 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -32,10 +32,13 @@ import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.stream.PartitionCountFetcher;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,6 +52,8 @@ public class PinotTableIdealStateBuilder {
private static final Logger LOGGER =
LoggerFactory.getLogger(PinotTableIdealStateBuilder.class);
public static final String ONLINE = "ONLINE";
public static final String OFFLINE = "OFFLINE";
+ private static final RetryPolicy DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY =
+ RetryPolicies.randomDelayRetryPolicy(3, 100L, 200L);
/**
*
@@ -115,14 +120,46 @@ public class PinotTableIdealStateBuilder {
pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig,
idealState);
}
- public static int getPartitionCount(StreamConfig streamConfig) {
- PartitionCountFetcher partitionCountFetcher = new
PartitionCountFetcher(streamConfig);
+ /**
+ * Fetches the list of {@link PartitionGroupMetadata} for the new partition
groups for the stream,
+ * with the help of the {@link PartitionGroupConsumptionStatus} of the
current partitionGroups.
+ *
+ * Reasons why <code>partitionGroupConsumptionStatusList</code> is needed:
+ *
+ * 1)
+ * The current {@link PartitionGroupConsumptionStatus} is used to determine
the offsets that have been consumed for a partition group.
+ * An example of where the offsets would be used:
+ * e.g. If partition group 1 contains shardId 1, with status DONE and
endOffset 150. There's 2 possibilities:
+ * 1) the stream indicates that shardId's last offset is 200.
+ * This tells Pinot that partition group 1 still has messages which haven't
been consumed, and must be included in the response.
+ * 2) the stream indicates that shardId's last offset is 150,
+ * This tells Pinot that all messages of partition group 1 have been
consumed, and it need not be included in the response.
+ * Thus, this call will skip a partition group when it has reached end of
life and all messages from that partition group have been consumed.
+ *
+ * The current {@link PartitionGroupConsumptionStatus} is also used to know
about existing groupings of partitions,
+ * and accordingly make the new partition groups.
+ * e.g. Assume that partition group 1 has status IN_PROGRESS and contains
shards 0,1,2
+ * and partition group 2 has status DONE and contains shards 3,4.
+ * In the above example, the
<code>partitionGroupConsumptionStatusList</code> indicates that
+ * the collection of shards in partition group 1, should remain unchanged in
the response,
+ * whereas shards 3,4 can be added to new partition groups if needed.
+ *
+ * @param streamConfig the streamConfig from the tableConfig
+ * @param partitionGroupConsumptionStatusList List of {@link
PartitionGroupConsumptionStatus} for the current partition groups.
+ * The size of this list is equal
to the number of partition groups,
+ * and is created using the latest
segment zk metadata.
+ */
+ public static List<PartitionGroupMetadata>
getPartitionGroupMetadataList(StreamConfig streamConfig,
+ List<PartitionGroupConsumptionStatus>
partitionGroupConsumptionStatusList) {
+ PartitionGroupMetadataFetcher partitionGroupMetadataFetcher =
+ new PartitionGroupMetadataFetcher(streamConfig,
partitionGroupConsumptionStatusList);
try {
- RetryPolicies.noDelayRetryPolicy(3).attempt(partitionCountFetcher);
- return partitionCountFetcher.getPartitionCount();
+
DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher);
+ return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
} catch (Exception e) {
- Exception fetcherException = partitionCountFetcher.getException();
- LOGGER.error("Could not get partition count for {}",
streamConfig.getTopicName(), fetcherException);
+ Exception fetcherException =
partitionGroupMetadataFetcher.getException();
+ LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of
table: {}", streamConfig.getTopicName(),
+ streamConfig.getTableNameWithType(), fetcherException);
throw new RuntimeException(fetcherException);
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 20620df..b3f6e1f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixAdmin;
@@ -75,8 +76,9 @@ import
org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
-import org.apache.pinot.spi.stream.PartitionOffsetFetcher;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
@@ -161,6 +163,48 @@ public class PinotLLCRealtimeSegmentManager {
return _controllerConf.getAcceptSplitCommit();
}
+ /**
+ * Using the ideal state and segment metadata, return a list of {@link
PartitionGroupConsumptionStatus}
+ * for latest segment of each partition group.
+ */
+ public List<PartitionGroupConsumptionStatus>
getPartitionGroupConsumptionStatusList(IdealState idealState,
+ StreamConfig streamConfig) {
+ List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList
= new ArrayList<>();
+
+ // From all segment names in the ideal state, find unique partition group
ids and their latest segment
+ Map<Integer, LLCSegmentName> partitionGroupIdToLatestSegment = new
HashMap<>();
+ for (String segment : idealState.getRecord().getMapFields().keySet()) {
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segment);
+ int partitionGroupId = llcSegmentName.getPartitionGroupId();
+ partitionGroupIdToLatestSegment.compute(partitionGroupId, (k,
latestSegment) -> {
+ if (latestSegment == null) {
+ return llcSegmentName;
+ } else {
+ return latestSegment.getSequenceNumber() >
llcSegmentName.getSequenceNumber() ? latestSegment
+ : llcSegmentName;
+ }
+ });
+ }
+
+ // Create a {@link PartitionGroupConsumptionStatus} for each latest segment
+ StreamPartitionMsgOffsetFactory offsetFactory =
+
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
+ for (Map.Entry<Integer, LLCSegmentName> entry :
partitionGroupIdToLatestSegment.entrySet()) {
+ int partitionGroupId = entry.getKey();
+ LLCSegmentName llcSegmentName = entry.getValue();
+ LLCRealtimeSegmentZKMetadata llRealtimeSegmentZKMetadata =
+ getSegmentZKMetadata(streamConfig.getTableNameWithType(),
llcSegmentName.getSegmentName());
+ PartitionGroupConsumptionStatus partitionGroupConsumptionStatus =
+ new PartitionGroupConsumptionStatus(partitionGroupId,
llcSegmentName.getSequenceNumber(),
+
offsetFactory.create(llRealtimeSegmentZKMetadata.getStartOffset()),
+ llRealtimeSegmentZKMetadata.getEndOffset() == null ? null
+ :
offsetFactory.create(llRealtimeSegmentZKMetadata.getEndOffset()),
+ llRealtimeSegmentZKMetadata.getStatus().toString());
+ partitionGroupConsumptionStatusList.add(partitionGroupConsumptionStatus);
+ }
+ return partitionGroupConsumptionStatusList;
+ }
+
public String getControllerVipUrl() {
return _controllerConf.generateVipUrl();
}
@@ -213,7 +257,8 @@ public class PinotLLCRealtimeSegmentManager {
PartitionLevelStreamConfig streamConfig =
new PartitionLevelStreamConfig(tableConfig.getTableName(),
IngestionConfigUtils.getStreamConfigMap(tableConfig));
InstancePartitions instancePartitions =
getConsumingInstancePartitions(tableConfig);
- int numPartitionGroups = getNumPartitions(streamConfig);
+ List<PartitionGroupMetadata> newPartitionGroupMetadataList =
getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList());
+ int numPartitionGroups = newPartitionGroupMetadataList.size();
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
@@ -222,10 +267,10 @@ public class PinotLLCRealtimeSegmentManager {
long currentTimeMs = getCurrentTimeMs();
Map<String, Map<String, String>> instanceStatesMap =
idealState.getRecord().getMapFields();
- for (int partitionGroupId = 0; partitionGroupId < numPartitionGroups;
partitionGroupId++) {
- String segmentName =
- setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupId,
currentTimeMs, instancePartitions,
- numPartitionGroups, numReplicas);
+ for (PartitionGroupMetadata partitionGroupMetadata :
newPartitionGroupMetadataList) {
+ String segmentName = setupNewPartitionGroup(tableConfig, streamConfig,
partitionGroupMetadata,
+ currentTimeMs, instancePartitions, numPartitionGroups, numReplicas);
+
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null,
segmentName, segmentAssignment,
instancePartitionsMap);
}
@@ -423,6 +468,8 @@ public class PinotLLCRealtimeSegmentManager {
private void commitSegmentMetadataInternal(String realtimeTableName,
CommittingSegmentDescriptor committingSegmentDescriptor) {
String committingSegmentName =
committingSegmentDescriptor.getSegmentName();
+ LLCSegmentName committingLLCSegment = new
LLCSegmentName(committingSegmentName);
+ int committingSegmentPartitionGroupId =
committingLLCSegment.getPartitionGroupId();
LOGGER.info("Committing segment metadata for segment: {}",
committingSegmentName);
TableConfig tableConfig = getTableConfig(realtimeTableName);
@@ -431,7 +478,6 @@ public class PinotLLCRealtimeSegmentManager {
Preconditions
.checkState(idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING),
"Failed to find instance in CONSUMING state in IdealState for
segment: %s", committingSegmentName);
- int numPartitionGroups = getNumPartitionsFromIdealState(idealState);
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
/*
@@ -448,14 +494,30 @@ public class PinotLLCRealtimeSegmentManager {
// Refresh the Broker routing to reflect the changes in the segment ZK
metadata
_helixResourceManager.sendSegmentRefreshMessage(realtimeTableName,
committingSegmentName, false, true);
- // Step-2
+ // Using the latest segment of each partition group, creates a list of
{@link PartitionGroupConsumptionStatus}
+ PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
+ getPartitionGroupConsumptionStatusList(idealState, streamConfig);
+
+ // Fetches new partition groups, given current list of {@link
PartitionGroupConsumptionStatus}.
+ List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+ getNewPartitionGroupMetadataList(streamConfig,
currentPartitionGroupConsumptionStatusList);
+ Set<Integer> newPartitionGroupSet =
+
newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId).collect(Collectors.toSet());
+ int numPartitionGroups = newPartitionGroupMetadataList.size();
+
+ // Only if committingSegment's partitionGroup is present in the
newPartitionGroupMetadataList, we create new segment metadata
+ String newConsumingSegmentName = null;
+ String rawTableName =
TableNameBuilder.extractRawTableName(realtimeTableName);
long newSegmentCreationTimeMs = getCurrentTimeMs();
- LLCSegmentName newLLCSegmentName =
- getNextLLCSegmentName(new LLCSegmentName(committingSegmentName),
newSegmentCreationTimeMs);
- createNewSegmentZKMetadata(tableConfig,
- new PartitionLevelStreamConfig(tableConfig.getTableName(),
IngestionConfigUtils.getStreamConfigMap(tableConfig)),
- newLLCSegmentName, newSegmentCreationTimeMs,
committingSegmentDescriptor, committingSegmentZKMetadata,
- instancePartitions, numPartitionGroups, numReplicas);
+ if (newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
+ LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName,
committingSegmentPartitionGroupId,
+ committingLLCSegment.getSequenceNumber() + 1,
newSegmentCreationTimeMs);
+ createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment,
newSegmentCreationTimeMs,
+ committingSegmentDescriptor, committingSegmentZKMetadata,
instancePartitions, numPartitionGroups, numReplicas);
+ newConsumingSegmentName = newLLCSegment.getSegmentName();
+ }
// Step-3
SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
@@ -471,12 +533,17 @@ public class PinotLLCRealtimeSegmentManager {
Lock lock = _idealStateUpdateLocks[lockIndex];
try {
lock.lock();
- updateIdealStateOnSegmentCompletion(realtimeTableName,
committingSegmentName, newLLCSegmentName.getSegmentName(),
+ updateIdealStateOnSegmentCompletion(realtimeTableName,
committingSegmentName, newConsumingSegmentName,
segmentAssignment, instancePartitionsMap);
} finally {
lock.unlock();
}
+ // TODO: also create the new partition groups here, instead of waiting
till the {@link RealtimeSegmentValidationManager} runs
+ // E.g. If current state is A, B, C, and newPartitionGroupMetadataList
contains B, C, D, E,
+ // then create metadata/idealstate entries for D, E along with the
committing partition's entries.
+ // Ensure that multiple committing segments don't create multiple new
segment metadata and ideal state entries for the same partitionGroup
+
// Trigger the metadata event notifier
_metadataEventNotifierFactory.create().notifyOnSegmentFlush(tableConfig);
}
@@ -545,9 +612,8 @@ public class PinotLLCRealtimeSegmentManager {
int numPartitionGroups, int numReplicas) {
String realtimeTableName = tableConfig.getTableName();
String segmentName = newLLCSegmentName.getSegmentName();
- StreamPartitionMsgOffsetFactory offsetFactory =
-
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
- StreamPartitionMsgOffset startOffset =
offsetFactory.create(committingSegmentDescriptor.getNextOffset());
+ String startOffset = committingSegmentDescriptor.getNextOffset();
+
LOGGER
.info("Creating segment ZK metadata for new CONSUMING segment: {} with
start offset: {} and creation time: {}",
segmentName, startOffset, creationTimeMs);
@@ -556,7 +622,7 @@ public class PinotLLCRealtimeSegmentManager {
newSegmentZKMetadata.setTableName(realtimeTableName);
newSegmentZKMetadata.setSegmentName(segmentName);
newSegmentZKMetadata.setCreationTime(creationTimeMs);
- newSegmentZKMetadata.setStartOffset(startOffset.toString());
+ newSegmentZKMetadata.setStartOffset(startOffset);
// Leave maxOffset as null.
newSegmentZKMetadata.setNumReplicas(numReplicas);
newSegmentZKMetadata.setStatus(Status.IN_PROGRESS);
@@ -632,24 +698,16 @@ public class PinotLLCRealtimeSegmentManager {
return commitTimeoutMS;
}
+ /**
+ * Fetches the latest state of the PartitionGroups for the stream
+ * If any partition has reached end of life, and all messages of that
partition have been consumed by the segment,
+ * it will be skipped from the result
+ */
@VisibleForTesting
- int getNumPartitions(StreamConfig streamConfig) {
- return PinotTableIdealStateBuilder.getPartitionCount(streamConfig);
- }
-
- @VisibleForTesting
- StreamPartitionMsgOffset getPartitionOffset(StreamConfig streamConfig,
OffsetCriteria offsetCriteria,
- int partitionGroupId) {
- PartitionOffsetFetcher partitionOffsetFetcher =
- new PartitionOffsetFetcher(offsetCriteria, partitionGroupId,
streamConfig);
- try {
- RetryPolicies.fixedDelayRetryPolicy(3,
1000L).attempt(partitionOffsetFetcher);
- return partitionOffsetFetcher.getOffset();
- } catch (Exception e) {
- throw new IllegalStateException(String
- .format("Failed to fetch the offset for topic: %s, partition: %s
with criteria: %s",
- streamConfig.getTopicName(), partitionGroupId, offsetCriteria),
e);
- }
+ List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig
streamConfig,
+ List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList) {
+ return
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig,
+ currentPartitionGroupConsumptionStatusList);
}
/**
@@ -696,9 +754,8 @@ public class PinotLLCRealtimeSegmentManager {
Map<Integer, LLCSegmentName> latestLLCSegmentNameMap = new HashMap<>();
for (String segmentName : segments) {
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
- latestLLCSegmentNameMap
- .compute(llcSegmentName.getPartitionGroupId(), (partitionGroupId,
latestLLCSegmentName) -> {
- if (latestLLCSegmentName == null) {
+ latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionGroupId(),
(partitionId, latestLLCSegmentName) -> {
+ if (latestLLCSegmentName == null) {
return llcSegmentName;
} else {
if (llcSegmentName.getSequenceNumber() >
latestLLCSegmentName.getSequenceNumber()) {
@@ -750,11 +807,15 @@ public class PinotLLCRealtimeSegmentManager {
Preconditions.checkState(!_isStopping, "Segment manager is stopping");
String realtimeTableName = tableConfig.getTableName();
- int numPartitionGroups = getNumPartitions(streamConfig);
HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState
-> {
assert idealState != null;
if (idealState.isEnabled()) {
- return ensureAllPartitionsConsuming(tableConfig, streamConfig,
idealState, numPartitionGroups);
+ List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
+ getPartitionGroupConsumptionStatusList(idealState, streamConfig);
+ List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+ getNewPartitionGroupMetadataList(streamConfig,
currentPartitionGroupConsumptionStatusList);
+ return ensureAllPartitionsConsuming(tableConfig, streamConfig,
idealState, newPartitionGroupMetadataList);
+
} else {
LOGGER.info("Skipping LLC segments validation for disabled table: {}",
realtimeTableName);
return idealState;
@@ -791,7 +852,7 @@ public class PinotLLCRealtimeSegmentManager {
@VisibleForTesting
void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String,
String>> instanceStatesMap,
- @Nullable String committingSegmentName, String newSegmentName,
SegmentAssignment segmentAssignment,
+ @Nullable String committingSegmentName, @Nullable String newSegmentName,
SegmentAssignment segmentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
if (committingSegmentName != null) {
// Change committing segment state to ONLINE
@@ -808,24 +869,23 @@ public class PinotLLCRealtimeSegmentManager {
// These conditions can happen again due to manual operations considered
as fixes in Issues #5559 and #5263
// The following check prevents the table from going into such a state
(but does not prevent the root cause
// of attempting such a zk update).
- LLCSegmentName newLLCSegmentName = new LLCSegmentName(newSegmentName);
- int partitionGroupId = newLLCSegmentName.getPartitionGroupId();
- int seqNum = newLLCSegmentName.getSequenceNumber();
- for (String segmentNameStr : instanceStatesMap.keySet()) {
- LLCSegmentName llcSegmentName = new LLCSegmentName(segmentNameStr);
- if (llcSegmentName.getPartitionGroupId() == partitionGroupId &&
llcSegmentName.getSequenceNumber() == seqNum) {
- String errorMsg =
- String.format("Segment %s is a duplicate of existing segment %s",
newSegmentName, segmentNameStr);
- LOGGER.error(errorMsg);
- throw new HelixHelper.PermanentUpdaterException(errorMsg);
+ if (newSegmentName != null) {
+ LLCSegmentName newLLCSegmentName = new LLCSegmentName(newSegmentName);
+ int partitionId = newLLCSegmentName.getPartitionGroupId();
+ int seqNum = newLLCSegmentName.getSequenceNumber();
+ for (String segmentNameStr : instanceStatesMap.keySet()) {
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentNameStr);
+ if (llcSegmentName.getPartitionGroupId() == partitionId &&
llcSegmentName.getSequenceNumber() == seqNum) {
+ String errorMsg = String.format("Segment %s is a duplicate of
existing segment %s", newSegmentName, segmentNameStr);
+ LOGGER.error(errorMsg);
+ throw new HelixHelper.PermanentUpdaterException(errorMsg);
+ }
}
+ // Assign instances to the new segment and add instances as state
CONSUMING
+ List<String> instancesAssigned =
segmentAssignment.assignSegment(newSegmentName, instanceStatesMap,
instancePartitionsMap);
+ instanceStatesMap.put(newSegmentName,
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.CONSUMING));
+ LOGGER.info("Adding new CONSUMING segment: {} to instances: {}",
newSegmentName, instancesAssigned);
}
- // Assign instances to the new segment and add instances as state CONSUMING
- List<String> instancesAssigned =
- segmentAssignment.assignSegment(newSegmentName, instanceStatesMap,
instancePartitionsMap);
- instanceStatesMap.put(newSegmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.CONSUMING));
- LOGGER.info("Adding new CONSUMING segment: {} to instances: {}",
newSegmentName, instancesAssigned);
}
/*
@@ -897,11 +957,14 @@ public class PinotLLCRealtimeSegmentManager {
*/
@VisibleForTesting
IdealState ensureAllPartitionsConsuming(TableConfig tableConfig,
PartitionLevelStreamConfig streamConfig,
- IdealState idealState, int numPartitionGroups) {
+ IdealState idealState, List<PartitionGroupMetadata>
newPartitionGroupMetadataList) {
String realtimeTableName = tableConfig.getTableName();
InstancePartitions instancePartitions =
getConsumingInstancePartitions(tableConfig);
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
+ int numPartitions = newPartitionGroupMetadataList.size();
+ Set<Integer> newPartitionGroupSet =
+
newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId).collect(Collectors.toSet());
SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
@@ -920,8 +983,8 @@ public class PinotLLCRealtimeSegmentManager {
// Possible things to repair:
// 1. The latest metadata is in DONE state, but the idealstate says
segment is CONSUMING:
// a. Create metadata for next segment and find hosts to assign it to.
- // b. update current segment in idealstate to ONLINE
- // c. add new segment in idealstate to CONSUMING on the hosts.
+ // b. update current segment in idealstate to ONLINE (only if partition
is present in newPartitionGroupMetadata)
+ // c. add new segment in idealstate to CONSUMING on the hosts (only if
partition is present in newPartitionGroupMetadata)
// 2. The latest metadata is IN_PROGRESS, but segment is not there in
idealstate.
// a. change prev segment to ONLINE in idealstate
// b. add latest segment to CONSUMING in idealstate.
@@ -938,7 +1001,7 @@ public class PinotLLCRealtimeSegmentManager {
Map<String, String> instanceStateMap =
instanceStatesMap.get(latestSegmentName);
if (instanceStateMap != null) {
// Latest segment of metadata is in idealstate.
- if (instanceStateMap.values().contains(SegmentStateModel.CONSUMING)) {
+ if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
if (latestSegmentZKMetadata.getStatus() == Status.DONE) {
// step-1 of commmitSegmentMetadata is done (i.e. marking old
segment as DONE)
@@ -947,17 +1010,26 @@ public class PinotLLCRealtimeSegmentManager {
if (!isExceededMaxSegmentCompletionTime(realtimeTableName,
latestSegmentName, currentTimeMs)) {
continue;
}
- LOGGER.info("Repairing segment: {} which is DONE in segment ZK
metadata, but is CONSUMING in IdealState",
- latestSegmentName);
-
- LLCSegmentName newLLCSegmentName =
getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
- String newSegmentName = newLLCSegmentName.getSegmentName();
- CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(latestSegmentName,
-
(offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
- createNewSegmentZKMetadata(tableConfig, streamConfig,
newLLCSegmentName, currentTimeMs,
- committingSegmentDescriptor, latestSegmentZKMetadata,
instancePartitions, numPartitionGroups, numReplicas);
- updateInstanceStatesForNewConsumingSegment(instanceStatesMap,
latestSegmentName, newSegmentName,
- segmentAssignment, instancePartitionsMap);
+ if (newPartitionGroupSet.contains(partitionGroupId)) {
+ LOGGER.info("Repairing segment: {} which is DONE in segment ZK
metadata, but is CONSUMING in IdealState",
+ latestSegmentName);
+
+ LLCSegmentName newLLCSegmentName =
getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
+ String newSegmentName = newLLCSegmentName.getSegmentName();
+ CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(latestSegmentName,
+
(offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
+ createNewSegmentZKMetadata(tableConfig, streamConfig,
newLLCSegmentName, currentTimeMs,
+ committingSegmentDescriptor, latestSegmentZKMetadata,
instancePartitions, numPartitions, numReplicas);
+ updateInstanceStatesForNewConsumingSegment(instanceStatesMap,
latestSegmentName, newSegmentName,
+ segmentAssignment, instancePartitionsMap);
+ } else { // partition group reached end of life
+ LOGGER.info(
+ "PartitionGroup: {} has reached end of life. Updating ideal
state for segment: {}. "
+ + "Skipping creation of new ZK metadata and new segment
in ideal state",
+ partitionGroupId, latestSegmentName);
+ updateInstanceStatesForNewConsumingSegment(instanceStatesMap,
latestSegmentName, null, segmentAssignment,
+ instancePartitionsMap);
+ }
}
// else, the metadata should be IN_PROGRESS, which is the right
state for a consuming segment.
} else { // no replica in CONSUMING state
@@ -972,29 +1044,33 @@ public class PinotLLCRealtimeSegmentManager {
// Create a new segment to re-consume from the previous start
offset
LLCSegmentName newLLCSegmentName =
getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
StreamPartitionMsgOffset startOffset =
offsetFactory.create(latestSegmentZKMetadata.getStartOffset());
+ StreamPartitionMsgOffset partitionGroupSmallestOffset =
+ getPartitionGroupSmallestOffset(streamConfig,
partitionGroupId);
+
// Start offset must be higher than the start offset of the stream
- StreamPartitionMsgOffset partitionStartOffset =
- getPartitionOffset(streamConfig,
OffsetCriteria.SMALLEST_OFFSET_CRITERIA, partitionGroupId);
- if (partitionStartOffset.compareTo(startOffset) > 0) {
+ if (partitionGroupSmallestOffset.compareTo(startOffset) > 0) {
LOGGER.error("Data lost from offset: {} to: {} for partition: {}
of table: {}", startOffset,
- partitionStartOffset, partitionGroupId, realtimeTableName);
+ partitionGroupSmallestOffset, partitionGroupId,
realtimeTableName);
_controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
- startOffset = partitionStartOffset;
+ startOffset = partitionGroupSmallestOffset;
}
CommittingSegmentDescriptor committingSegmentDescriptor =
new CommittingSegmentDescriptor(latestSegmentName,
startOffset.toString(), 0);
createNewSegmentZKMetadata(tableConfig, streamConfig,
newLLCSegmentName, currentTimeMs,
- committingSegmentDescriptor, latestSegmentZKMetadata,
instancePartitions, numPartitionGroups, numReplicas);
+ committingSegmentDescriptor, latestSegmentZKMetadata,
instancePartitions, numPartitions, numReplicas);
String newSegmentName = newLLCSegmentName.getSegmentName();
updateInstanceStatesForNewConsumingSegment(instanceStatesMap,
null, newSegmentName, segmentAssignment,
instancePartitionsMap);
} else {
- // If we get here, that means in IdealState, the latest segment
has no CONSUMING replicas, but has replicas
- // not OFFLINE. That is an unexpected state which cannot be fixed
by the validation manager currently. In
- // that case, we need to either extend this part to handle the
state, or prevent segments from getting into
- // such state.
- LOGGER.error("Got unexpected instance state map: {} for segment:
{}", instanceStateMap, latestSegmentName);
+ if (newPartitionGroupSet.contains(partitionGroupId)) {
+ // If we get here, that means in IdealState, the latest segment
has no CONSUMING replicas, but has replicas
+ // not OFFLINE. That is an unexpected state which cannot be
fixed by the validation manager currently. In
+ // that case, we need to either extend this part to handle the
state, or prevent segments from getting into
+ // such state.
+ LOGGER.error("Got unexpected instance state map: {} for segment:
{}", instanceStateMap, latestSegmentName);
+ }
+ // else, the partition group has reached end of life. This is an
acceptable state
}
}
} else {
@@ -1036,10 +1112,11 @@ public class PinotLLCRealtimeSegmentManager {
}
// Set up new partitions if not exist
- for (int partitionGroupId = 0; partitionGroupId < numPartitionGroups;
partitionGroupId++) {
+ for (PartitionGroupMetadata partitionGroupMetadata :
newPartitionGroupMetadataList) {
+ int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) {
String newSegmentName =
- setupNewPartitionGroup(tableConfig, streamConfig,
partitionGroupId, currentTimeMs, instancePartitions, numPartitionGroups,
+ setupNewPartitionGroup(tableConfig, streamConfig,
partitionGroupMetadata, currentTimeMs, instancePartitions, numPartitions,
numReplicas);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null,
newSegmentName, segmentAssignment,
instancePartitionsMap);
@@ -1049,6 +1126,25 @@ public class PinotLLCRealtimeSegmentManager {
return idealState;
}
+ private StreamPartitionMsgOffset
getPartitionGroupSmallestOffset(StreamConfig streamConfig, int
partitionGroupId) {
+ Map<String, String> streamConfigMapWithSmallestOffsetCriteria = new
HashMap<>(streamConfig.getStreamConfigsMap());
+ streamConfigMapWithSmallestOffsetCriteria.put(StreamConfigProperties
+ .constructStreamProperty(streamConfig.getType(),
StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA),
+ OffsetCriteria.SMALLEST_OFFSET_CRITERIA.getOffsetString());
+ StreamConfig smallestOffsetCriteriaStreamConfig =
+ new StreamConfig(streamConfig.getTableNameWithType(),
streamConfigMapWithSmallestOffsetCriteria);
+ List<PartitionGroupMetadata> smallestOffsetCriteriaPartitionGroupMetadata =
+ getNewPartitionGroupMetadataList(smallestOffsetCriteriaStreamConfig,
Collections.emptyList());
+ StreamPartitionMsgOffset partitionStartOffset = null;
+ for (PartitionGroupMetadata info :
smallestOffsetCriteriaPartitionGroupMetadata) {
+ if (info.getPartitionGroupId() == partitionGroupId) {
+ partitionStartOffset = info.getStartOffset();
+ break;
+ }
+ }
+ return partitionStartOffset;
+ }
+
private LLCSegmentName getNextLLCSegmentName(LLCSegmentName
lastLLCSegmentName, long creationTimeMs) {
return new LLCSegmentName(lastLLCSegmentName.getTableName(),
lastLLCSegmentName.getPartitionGroupId(),
lastLLCSegmentName.getSequenceNumber() + 1, creationTimeMs);
@@ -1058,41 +1154,32 @@ public class PinotLLCRealtimeSegmentManager {
* Sets up a new partition group.
* <p>Persists the ZK metadata for the first CONSUMING segment, and returns
the segment name.
*/
- private String setupNewPartitionGroup(TableConfig tableConfig,
PartitionLevelStreamConfig streamConfig,
- int partitionGroupId, long creationTimeMs, InstancePartitions
instancePartitions, int numPartitionGroups,
- int numReplicas) {
+ private String setupNewPartitionGroup(TableConfig tableConfig,
PartitionLevelStreamConfig streamConfig, PartitionGroupMetadata
partitionGroupMetadata,
+ long creationTimeMs, InstancePartitions instancePartitions, int
numPartitionGroups, int numReplicas) {
String realtimeTableName = tableConfig.getTableName();
+ int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
+ String startOffset = partitionGroupMetadata.getStartOffset().toString();
LOGGER.info("Setting up new partition group: {} for table: {}",
partitionGroupId, realtimeTableName);
String rawTableName =
TableNameBuilder.extractRawTableName(realtimeTableName);
LLCSegmentName newLLCSegmentName =
new LLCSegmentName(rawTableName, partitionGroupId,
STARTING_SEQUENCE_NUMBER, creationTimeMs);
String newSegmentName = newLLCSegmentName.getSegmentName();
- StreamPartitionMsgOffset startOffset =
- getPartitionOffset(streamConfig, streamConfig.getOffsetCriteria(),
partitionGroupId);
+
CommittingSegmentDescriptor committingSegmentDescriptor =
- new CommittingSegmentDescriptor(null, startOffset.toString(), 0);
+ new CommittingSegmentDescriptor(null, startOffset, 0);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName,
creationTimeMs,
committingSegmentDescriptor, null, instancePartitions,
numPartitionGroups, numReplicas);
return newSegmentName;
}
+
@VisibleForTesting
long getCurrentTimeMs() {
return System.currentTimeMillis();
}
- private int getNumPartitionsFromIdealState(IdealState idealState) {
- int numPartitionGroups = 0;
- for (String segmentName : idealState.getRecord().getMapFields().keySet()) {
- if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
- numPartitionGroups = Math.max(numPartitionGroups, new
LLCSegmentName(segmentName).getPartitionGroupId() + 1);
- }
- }
- return numPartitionGroups;
- }
-
private int getNumReplicas(TableConfig tableConfig, InstancePartitions
instancePartitions) {
if (instancePartitions.getNumReplicaGroups() == 1) {
// Non-replica-group based
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index a6be7e8..143d35c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -1160,7 +1160,9 @@ public class SegmentCompletionManager {
* @return true if winner picked, false otherwise.
*/
private boolean isWinnerPicked(String preferredInstance, long now, final
String stopReason) {
- if (SegmentCompletionProtocol.REASON_ROW_LIMIT.equals(stopReason) &&
_commitStateMap.size() == 1) {
+ if ((SegmentCompletionProtocol.REASON_ROW_LIMIT.equals(stopReason)
+ ||
SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP.equals(stopReason))
+ && _commitStateMap.size() == 1) {
_winner = preferredInstance;
_winningOffset = _commitStateMap.get(preferredInstance);
return true;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
index 6706378..2e23327 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
@@ -102,6 +102,7 @@ public class SegmentSizeBasedFlushThresholdUpdater
implements FlushThresholdUpda
// less same characteristics at any one point in time).
// However, when we start a new table or change controller mastership, we
can have any partition completing first.
// It is best to learn the ratio as quickly as we can, so we allow any
partition to supply the value.
+ // FIXME: The stream may not have partition "0"
if (new LLCSegmentName(newSegmentName).getPartitionGroupId() == 0 ||
_latestSegmentRowsToSizeRatio == 0) {
if (_latestSegmentRowsToSizeRatio > 0) {
_latestSegmentRowsToSizeRatio =
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 0682986..5a63b4d 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.controller.helix.core.realtime;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -30,6 +31,8 @@ import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.IdealState;
@@ -56,7 +59,8 @@ import
org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.stream.LongMsgOffset;
-import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -92,7 +96,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
static final int NUM_DOCS = RANDOM.nextInt(Integer.MAX_VALUE) + 1;
@AfterClass
- public void tearDown() throws IOException {
+ public void tearDown()
+ throws IOException {
FileUtils.deleteDirectory(TEMP_DIR);
}
@@ -246,6 +251,49 @@ public class PinotLLCRealtimeSegmentManagerTest {
} catch (IllegalStateException e) {
// Expected
}
+
+ // committing segment's partitionGroupId no longer in the
newPartitionGroupMetadataList
+ List<PartitionGroupMetadata> partitionGroupMetadataListWithout0 =
+
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfig,
Collections.emptyList());
+ partitionGroupMetadataListWithout0.remove(0);
+ segmentManager._partitionGroupMetadataList =
partitionGroupMetadataListWithout0;
+
+ // Commit a segment for partition 0 - No new entries created for partition
which reached end of life
+ committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 2,
CURRENT_TIME_MS).getSegmentName();
+ String committingSegmentStartOffset =
segmentManager._segmentZKMetadataMap.get(committingSegment).getStartOffset();
+ String committingSegmentEndOffset =
+ new LongMsgOffset(Long.parseLong(committingSegmentStartOffset) +
NUM_DOCS).toString();
+ committingSegmentDescriptor = new
CommittingSegmentDescriptor(committingSegment, committingSegmentEndOffset, 0L);
+ committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+ int instanceStateMapSize = instanceStatesMap.size();
+ int metadataMapSize = segmentManager._segmentZKMetadataMap.size();
+ segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME,
committingSegmentDescriptor);
+ // No changes in the number of ideal state or zk entries
+ assertEquals(instanceStatesMap.size(), instanceStateMapSize);
+ assertEquals(segmentManager._segmentZKMetadataMap.size(), metadataMapSize);
+
+ // Verify instance states for committed segment and new consuming segment
+ committedSegmentInstanceStateMap =
instanceStatesMap.get(committingSegment);
+ assertNotNull(committedSegmentInstanceStateMap);
+ assertEquals(new HashSet<>(committedSegmentInstanceStateMap.values()),
+ Collections.singleton(SegmentStateModel.ONLINE));
+
+ consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 3,
CURRENT_TIME_MS).getSegmentName();
+ consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment);
+ assertNull(consumingSegmentInstanceStateMap);
+
+ // Verify segment ZK metadata for committed segment and new consuming
segment
+ committedSegmentZKMetadata =
segmentManager._segmentZKMetadataMap.get(committingSegment);
+ assertEquals(committedSegmentZKMetadata.getStatus(), Status.DONE);
+ assertEquals(committedSegmentZKMetadata.getStartOffset(),
committingSegmentStartOffset);
+ assertEquals(committedSegmentZKMetadata.getEndOffset(),
committingSegmentEndOffset);
+ assertEquals(committedSegmentZKMetadata.getCreationTime(),
CURRENT_TIME_MS);
+ assertEquals(committedSegmentZKMetadata.getCrc(), Long.parseLong(CRC));
+ assertEquals(committedSegmentZKMetadata.getIndexVersion(),
SEGMENT_VERSION);
+ assertEquals(committedSegmentZKMetadata.getTotalDocs(), NUM_DOCS);
+
+ consumingSegmentZKMetadata =
segmentManager._segmentZKMetadataMap.get(consumingSegment);
+ assertNull(consumingSegmentZKMetadata);
}
/**
@@ -409,6 +457,19 @@ public class PinotLLCRealtimeSegmentManagerTest {
*
* 4. MaxSegmentCompletionTime: Segment completion has 5 minutes to retry
and complete between steps 1 and 3.
* Correction: Do not correct the segments before the allowed time for
segment completion
+ *
+ * End-of-shard case:
+ * Additionally, shards of some streams may be detected as reached
end-of-life when committing.
+ * In such cases, step 2 is skipped, and step 3 is done partially (change
committing segment state to ONLINE
+ * but don't create new segment with state CONSUMING)
+ *
+ * Scenarios:
+ * 1. Step 3 failed - we will find segment ZK metadata DONE, but ideal state
CONSUMING
+ * Correction: Since shard has ended, do not create new segment ZK metadata,
or new entry in ideal state.
+ * Simply update CONSUMING segment in ideal state to ONLINE
+ *
+ * 2. Shard which has reached EOL detected - we will find segment ZK
metadata DONE and ideal state ONLINE
+ * Correction: No repair needed. Acceptable case.
*/
@Test
public void testRepairs() {
@@ -420,12 +481,12 @@ public class PinotLLCRealtimeSegmentManagerTest {
// Remove the CONSUMING segment from the ideal state for partition group 0
(step 3 failed)
String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
removeNewConsumingSegment(instanceStatesMap, consumingSegment, null);
- testRepairs(segmentManager);
+ testRepairs(segmentManager, Collections.emptyList());
// Remove the CONSUMING segment from the ideal state and segment ZK
metadata map for partition group 0 (step 2 failed)
removeNewConsumingSegment(instanceStatesMap, consumingSegment, null);
assertNotNull(segmentManager._segmentZKMetadataMap.remove(consumingSegment));
- testRepairs(segmentManager);
+ testRepairs(segmentManager, Collections.emptyList());
// 2 partitions commit segment
for (int partitionGroupId = 0; partitionGroupId < 2; partitionGroupId++) {
@@ -440,12 +501,12 @@ public class PinotLLCRealtimeSegmentManagerTest {
consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1,
CURRENT_TIME_MS).getSegmentName();
String latestCommittedSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
removeNewConsumingSegment(instanceStatesMap, consumingSegment,
latestCommittedSegment);
- testRepairs(segmentManager);
+ testRepairs(segmentManager, Collections.emptyList());
// Remove the CONSUMING segment from the ideal state and segment ZK
metadata map for partition group 0 (step 2 failed)
removeNewConsumingSegment(instanceStatesMap, consumingSegment,
latestCommittedSegment);
assertNotNull(segmentManager._segmentZKMetadataMap.remove(consumingSegment));
- testRepairs(segmentManager);
+ testRepairs(segmentManager, Collections.emptyList());
/*
Test all replicas of the new segment are OFFLINE
@@ -459,12 +520,12 @@ public class PinotLLCRealtimeSegmentManagerTest {
// Turn all the replicas for the CONSUMING segment to OFFLINE for
partition group 0
consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
turnNewConsumingSegmentOffline(instanceStatesMap, consumingSegment);
- testRepairs(segmentManager);
+ testRepairs(segmentManager, Collections.emptyList());
// Turn all the replicas for the CONSUMING segment to OFFLINE for
partition group 0 again
consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1,
CURRENT_TIME_MS).getSegmentName();
turnNewConsumingSegmentOffline(instanceStatesMap, consumingSegment);
- testRepairs(segmentManager);
+ testRepairs(segmentManager, Collections.emptyList());
// 2 partitions commit segment
for (int partitionGroupId = 0; partitionGroupId < 2; partitionGroupId++) {
@@ -482,22 +543,51 @@ public class PinotLLCRealtimeSegmentManagerTest {
consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 3,
CURRENT_TIME_MS).getSegmentName();
latestCommittedSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 2,
CURRENT_TIME_MS).getSegmentName();
removeNewConsumingSegment(instanceStatesMap, consumingSegment,
latestCommittedSegment);
- testRepairs(segmentManager);
+ testRepairs(segmentManager, Collections.emptyList());
// Remove the CONSUMING segment from the ideal state and segment ZK
metadata map for partition group 0 (step 2 failed)
removeNewConsumingSegment(instanceStatesMap, consumingSegment,
latestCommittedSegment);
assertNotNull(segmentManager._segmentZKMetadataMap.remove(consumingSegment));
- testRepairs(segmentManager);
+ testRepairs(segmentManager, Collections.emptyList());
// Turn all the replicas for the CONSUMING segment to OFFLINE for
partition group 0
consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 3,
CURRENT_TIME_MS).getSegmentName();
turnNewConsumingSegmentOffline(instanceStatesMap, consumingSegment);
- testRepairs(segmentManager);
+ testRepairs(segmentManager, Collections.emptyList());
// Turn all the replicas for the CONSUMING segment to OFFLINE for
partition group 0 again
consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 4,
CURRENT_TIME_MS).getSegmentName();
turnNewConsumingSegmentOffline(instanceStatesMap, consumingSegment);
- testRepairs(segmentManager);
+ testRepairs(segmentManager, Collections.emptyList());
+
+ /*
+ * End of shard cases
+ */
+ // 1 reached end of shard.
+ List<PartitionGroupMetadata> partitionGroupMetadataListWithout1 =
+
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfig,
Collections.emptyList());
+ partitionGroupMetadataListWithout1.remove(1);
+ segmentManager._partitionGroupMetadataList =
partitionGroupMetadataListWithout1;
+ // noop
+ testRepairs(segmentManager, Collections.emptyList());
+
+ // 1 commits segment - should not create new metadata or CONSUMING segment
+ String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 1, 1,
CURRENT_TIME_MS).getSegmentName();
+ String startOffset =
segmentManager._segmentZKMetadataMap.get(segmentName).getStartOffset();
+ CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(segmentName,
+ new LongMsgOffset(Long.parseLong(startOffset) + NUM_DOCS).toString(),
0L);
+ committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+ segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME,
committingSegmentDescriptor);
+ // ONLINE in IS and metadata DONE, but end of shard (not present in
partition group list), so don't repair
+ testRepairs(segmentManager, Lists.newArrayList(1));
+
+ // make the last ONLINE segment of the shard as CONSUMING (failed between
step1 and 3)
+ segmentManager._partitionGroupMetadataList =
partitionGroupMetadataListWithout1;
+ consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 1, 1,
CURRENT_TIME_MS).getSegmentName();
+ turnNewConsumingSegmentConsuming(instanceStatesMap, consumingSegment);
+
+ // makes the IS to ONLINE, but creates no new entries, because end of
shard.
+ testRepairs(segmentManager, Lists.newArrayList(1));
}
/**
@@ -537,7 +627,19 @@ public class PinotLLCRealtimeSegmentManagerTest {
}
}
- private void testRepairs(FakePinotLLCRealtimeSegmentManager segmentManager) {
+ /**
+ * Turns all instances for the segment to CONSUMING in the ideal state.
+ */
+ private void turnNewConsumingSegmentConsuming(Map<String, Map<String,
String>> instanceStatesMap,
+ String consumingSegment) {
+ Map<String, String> consumingSegmentInstanceStateMap =
instanceStatesMap.get(consumingSegment);
+ assertNotNull(consumingSegmentInstanceStateMap);
+ for (Map.Entry<String, String> entry :
consumingSegmentInstanceStateMap.entrySet()) {
+ entry.setValue(SegmentStateModel.CONSUMING);
+ }
+ }
+
+ private void testRepairs(FakePinotLLCRealtimeSegmentManager segmentManager,
List<Integer> shardsEnded) {
Map<String, Map<String, String>> oldInstanceStatesMap =
cloneInstanceStatesMap(segmentManager._idealState.getRecord().getMapFields());
segmentManager._exceededMaxSegmentCompletionTime = false;
@@ -545,7 +647,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
verifyNoChangeToOldEntries(segmentManager, oldInstanceStatesMap);
segmentManager._exceededMaxSegmentCompletionTime = true;
segmentManager.ensureAllPartitionsConsuming();
- verifyRepairs(segmentManager);
+ verifyRepairs(segmentManager, shardsEnded);
}
/**
@@ -562,7 +664,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
}
}
- private void verifyRepairs(FakePinotLLCRealtimeSegmentManager
segmentManager) {
+ private void verifyRepairs(FakePinotLLCRealtimeSegmentManager
segmentManager, List<Integer> shardsEnded) {
Map<String, Map<String, String>> instanceStatesMap =
segmentManager._idealState.getRecord().getMapFields();
// Segments are the same for ideal state and ZK metadata
@@ -578,8 +680,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
Map<String, String> instanceStateMap = entry.getValue();
// Skip segments with all instances OFFLINE
- if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) ||
instanceStateMap.containsValue(
- SegmentStateModel.CONSUMING)) {
+ if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) ||
instanceStateMap
+ .containsValue(SegmentStateModel.CONSUMING)) {
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
int partitionsId = llcSegmentName.getPartitionGroupId();
Map<Integer, String> sequenceNumberToSegmentMap =
partitionGroupIdToSegmentsMap.get(partitionsId);
@@ -596,15 +698,19 @@ public class PinotLLCRealtimeSegmentManagerTest {
String latestSegment = segments.get(numSegments - 1);
- // Latest segment should have CONSUMING instance but no ONLINE instance
in ideal state
Map<String, String> instanceStateMap =
instanceStatesMap.get(latestSegment);
- assertTrue(instanceStateMap.containsValue(SegmentStateModel.CONSUMING));
- assertFalse(instanceStateMap.containsValue(SegmentStateModel.ONLINE));
+ if (!shardsEnded.contains(partitionGroupId)) {
+ // Latest segment should have CONSUMING instance but no ONLINE
instance in ideal state
+
assertTrue(instanceStateMap.containsValue(SegmentStateModel.CONSUMING));
+ assertFalse(instanceStateMap.containsValue(SegmentStateModel.ONLINE));
+
+ // Latest segment ZK metadata should be IN_PROGRESS
+
assertEquals(segmentManager._segmentZKMetadataMap.get(latestSegment).getStatus(),
Status.IN_PROGRESS);
+ numSegments--;
+ }
- // Latest segment ZK metadata should be IN_PROGRESS
-
assertEquals(segmentManager._segmentZKMetadataMap.get(latestSegment).getStatus(),
Status.IN_PROGRESS);
+ for (int i = 0; i < numSegments; i++) {
- for (int i = 0; i < numSegments - 1; i++) {
String segmentName = segments.get(i);
// Committed segment should have all instances in ONLINE state
@@ -618,8 +724,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
// Verify segment start/end offset
assertEquals(segmentZKMetadata.getStartOffset(),
new LongMsgOffset(PARTITION_OFFSET.getOffset() + i * (long)
NUM_DOCS).toString());
- assertEquals(segmentZKMetadata.getEndOffset(),
- segmentManager._segmentZKMetadataMap.get(segments.get(i +
1)).getStartOffset());
+ if (shardsEnded.contains(partitionGroupId) && ((i + 1) ==
numSegments)) {
+ assertEquals(Long.parseLong(segmentZKMetadata.getEndOffset()),
+ Long.parseLong(segmentZKMetadata.getStartOffset()) + NUM_DOCS);
+ } else {
+ assertEquals(segmentZKMetadata.getEndOffset(),
+ segmentManager._segmentZKMetadataMap.get(segments.get(i +
1)).getStartOffset());
+ }
}
}
}
@@ -669,7 +780,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
}
@Test
- public void testCommitSegmentFile() throws Exception {
+ public void testCommitSegmentFile()
+ throws Exception {
PinotFSFactory.init(new PinotConfiguration());
File tableDir = new File(TEMP_DIR, RAW_TABLE_NAME);
String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
@@ -688,7 +800,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
}
@Test
- public void testSegmentAlreadyThereAndExtraneousFilesDeleted() throws
Exception {
+ public void testSegmentAlreadyThereAndExtraneousFilesDeleted()
+ throws Exception {
PinotFSFactory.init(new PinotConfiguration());
File tableDir = new File(TEMP_DIR, RAW_TABLE_NAME);
String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
@@ -716,7 +829,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
}
@Test
- public void testStopSegmentManager() throws Exception {
+ public void testStopSegmentManager()
+ throws Exception {
FakePinotLLCRealtimeSegmentManager segmentManager = new
FakePinotLLCRealtimeSegmentManager();
segmentManager._numReplicas = 2;
segmentManager.makeTableConfig();
@@ -816,6 +930,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
Map<String, Integer> _segmentZKMetadataVersionMap = new HashMap<>();
IdealState _idealState;
int _numPartitions;
+ List<PartitionGroupMetadata> _partitionGroupMetadataList = null;
boolean _exceededMaxSegmentCompletionTime = false;
FakePinotLLCRealtimeSegmentManager() {
@@ -824,11 +939,9 @@ public class PinotLLCRealtimeSegmentManagerTest {
void makeTableConfig() {
Map<String, String> streamConfigs =
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
- _tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
- .setNumReplicas(_numReplicas)
- .setLLC(true)
- .setStreamConfigs(streamConfigs)
- .build();
+ _tableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(_numReplicas)
+ .setLLC(true).setStreamConfigs(streamConfigs).build();
_streamConfig = new
PartitionLevelStreamConfig(_tableConfig.getTableName(),
IngestionConfigUtils.getStreamConfigMap(_tableConfig));
}
@@ -848,7 +961,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
}
public void ensureAllPartitionsConsuming() {
- ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState,
_numPartitions);
+ ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState,
+ getNewPartitionGroupMetadataList(_streamConfig,
Collections.emptyList()));
}
@Override
@@ -907,20 +1021,21 @@ public class PinotLLCRealtimeSegmentManagerTest {
void updateIdealStateOnSegmentCompletion(String realtimeTableName, String
committingSegmentName,
String newSegmentName, SegmentAssignment segmentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap)
{
-
updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
committingSegmentName,
- newSegmentName, segmentAssignment, instancePartitionsMap);
+
updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
committingSegmentName, null,
+ segmentAssignment, instancePartitionsMap);
+
updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
null, newSegmentName,
+ segmentAssignment, instancePartitionsMap);
}
@Override
- int getNumPartitions(StreamConfig streamConfig) {
- return _numPartitions;
- }
-
- @Override
- LongMsgOffset getPartitionOffset(StreamConfig streamConfig, OffsetCriteria
offsetCriteria, int partitionGroupId) {
- // The criteria for this test should always be SMALLEST (for default
streaming config and new added partitions)
- assertTrue(offsetCriteria.isSmallest());
- return PARTITION_OFFSET;
+ List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig
streamConfig,
+ List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList) {
+ if (_partitionGroupMetadataList != null) {
+ return _partitionGroupMetadataList;
+ } else {
+ return IntStream.range(0, _numPartitions).mapToObj(i -> new
PartitionGroupMetadata(i, PARTITION_OFFSET))
+ .collect(Collectors.toList());
+ }
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 185bf5b..e7808de 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -71,7 +72,8 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.metrics.PinotMeter;
import org.apache.pinot.spi.stream.MessageBatch;
-import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.stream.PermanentConsumerException;
import org.apache.pinot.spi.stream.RowMetadata;
@@ -238,6 +240,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
// Segment end criteria
private volatile long _consumeEndTime = 0;
+ private volatile boolean _endOfPartitionGroup = false;
private StreamPartitionMsgOffset _finalOffset; // Used when we want to catch
up to this one
private volatile boolean _shouldStop = false;
@@ -248,10 +251,11 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
private Thread _consumerThread;
private final String _streamTopic;
private final int _partitionGroupId;
+ private final PartitionGroupConsumptionStatus
_partitionGroupConsumptionStatus;
final String _clientId;
private final LLCSegmentName _llcSegmentName;
private final RecordTransformer _recordTransformer;
- private PartitionLevelConsumer _partitionLevelConsumer = null;
+ private PartitionGroupConsumer _partitionGroupConsumer = null;
private StreamMetadataProvider _streamMetadataProvider = null;
private final File _resourceTmpDir;
private final String _tableNameWithType;
@@ -304,6 +308,12 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
_numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
_stopReason = SegmentCompletionProtocol.REASON_ROW_LIMIT;
return true;
+ } else if (_endOfPartitionGroup) {
+ segmentLogger.info(
+ "Stopping consumption due to end of partitionGroup reached
nRows={} numRowsIndexed={}, numRowsConsumed={}",
+ _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
+ _stopReason =
SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP;
+ return true;
}
return false;
@@ -380,8 +390,9 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
// Update _currentOffset upon return from this method
MessageBatch messageBatch;
try {
- messageBatch = _partitionLevelConsumer
+ messageBatch = _partitionGroupConsumer
.fetchMessages(_currentOffset, null,
_partitionLevelStreamConfig.getFetchTimeoutMillis());
+ _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup();
consecutiveErrorCount = 0;
} catch (TimeoutException e) {
handleTransientStreamErrors(e);
@@ -889,16 +900,16 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
}
private void closeStreamConsumers() {
- closePartitionLevelConsumer();
+ closePartitionGroupConsumer();
closeStreamMetadataProvider();
if (_acquiredConsumerSemaphore.compareAndSet(true, false)) {
_partitionGroupConsumerSemaphore.release();
}
}
- private void closePartitionLevelConsumer() {
+ private void closePartitionGroupConsumer() {
try {
- _partitionLevelConsumer.close();
+ _partitionGroupConsumer.close();
} catch (Exception e) {
segmentLogger.warn("Could not close stream consumer", e);
}
@@ -1130,6 +1141,11 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
_segmentNameStr = _segmentZKMetadata.getSegmentName();
_llcSegmentName = llcSegmentName;
_partitionGroupId = _llcSegmentName.getPartitionGroupId();
+ _partitionGroupConsumptionStatus = new
PartitionGroupConsumptionStatus(_partitionGroupId,
_llcSegmentName.getSequenceNumber(),
+
_streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getStartOffset()),
+ _segmentZKMetadata.getEndOffset() == null ? null
+ :
_streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getEndOffset()),
+ _segmentZKMetadata.getStatus().toString());
_partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore;
_acquiredConsumerSemaphore = new AtomicBoolean(false);
_metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" +
_partitionGroupId;
@@ -1243,13 +1259,20 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
// long as the partition function is not changed.
int numPartitions = columnPartitionConfig.getNumPartitions();
try {
- int numStreamPartitions =
_streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs=*/5000L);
- if (numStreamPartitions != numPartitions) {
+ // TODO: currentPartitionGroupConsumptionStatus should be fetched
from idealState + segmentZkMetadata,
+ // so that we get back accurate partitionGroups info
+ // However this is not an issue for Kafka, since partitionGroups
never expire and every partitionGroup has a single partition
+ // Fix this before opening support for partitioning in Kinesis
+ int numPartitionGroups = _streamMetadataProvider
+ .computePartitionGroupMetadata(_clientId,
_partitionLevelStreamConfig,
+ Collections.emptyList(), /*maxWaitTimeMs=*/5000).size();
+
+ if (numPartitionGroups != numPartitions) {
segmentLogger.warn(
"Number of stream partitions: {} does not match number of
partitions in the partition config: {}, using number of stream partitions",
- numStreamPartitions, numPartitions);
+ numPartitionGroups, numPartitions);
_serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.REALTIME_PARTITION_MISMATCH, 1);
- numPartitions = numStreamPartitions;
+ numPartitions = numPartitionGroups;
}
} catch (Exception e) {
segmentLogger.warn(
@@ -1306,26 +1329,25 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
/**
* Creates a new stream consumer
- * @param reason
*/
private void makeStreamConsumer(String reason) {
- if (_partitionLevelConsumer != null) {
- closePartitionLevelConsumer();
+ if (_partitionGroupConsumer != null) {
+ closePartitionGroupConsumer();
}
segmentLogger.info("Creating new stream consumer, reason: {}", reason);
- _partitionLevelConsumer =
_streamConsumerFactory.createPartitionLevelConsumer(_clientId,
_partitionGroupId);
+ _partitionGroupConsumer =
_streamConsumerFactory.createPartitionGroupConsumer(_clientId,
+ _partitionGroupConsumptionStatus);
}
/**
* Creates a new stream metadata provider
- * @param reason
*/
private void makeStreamMetadataProvider(String reason) {
if (_streamMetadataProvider != null) {
closeStreamMetadataProvider();
}
segmentLogger.info("Creating new stream metadata provider, reason: {}",
reason);
- _streamMetadataProvider =
_streamConsumerFactory.createPartitionMetadataProvider(_clientId,
_partitionGroupId);
+ _streamMetadataProvider =
_streamConsumerFactory.createStreamMetadataProvider(_clientId);
}
// This should be done during commit? We may not always commit when we build
a segment....
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 39cf466..9aab4b7 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -37,6 +37,7 @@ import
org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import org.apache.pinot.common.metrics.PinotMetricUtils;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.core.data.manager.TableDataManager;
@@ -154,6 +155,7 @@ public class LLRealtimeSegmentDataManagerTest {
segmentZKMetadata.setSegmentName(_segmentNameStr);
segmentZKMetadata.setStartOffset(_startOffset.toString());
segmentZKMetadata.setCreationTime(System.currentTimeMillis());
+
segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
return segmentZKMetadata;
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/LongMsgOffset.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/LongMsgOffset.java
index e5025f6..121ae32 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/LongMsgOffset.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/LongMsgOffset.java
@@ -50,4 +50,9 @@ public class LongMsgOffset implements
StreamPartitionMsgOffset {
public String toString() {
return Long.toString(_offset);
}
+
+ @Override
+ public StreamPartitionMsgOffset fromString(String longOffset) {
+ return new LongMsgOffset(longOffset);
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
index 3052b9e..02c721f 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
@@ -81,4 +81,11 @@ public interface MessageBatch<T> {
default StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int
index) {
return new LongMsgOffset(getNextStreamMessageOffsetAtIndex(index));
}
+
+ /**
+ * Returns true if end of the consumer detects that no more records can be
read from this partition group for good
+ */
+ default boolean isEndOfPartitionGroup() {
+ return false;
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
new file mode 100644
index 0000000..ae4c135
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+import java.io.Closeable;
+import java.util.concurrent.TimeoutException;
+
+
+/**
+ * Consumer interface for consuming from a partition group of a stream
+ */
+public interface PartitionGroupConsumer extends Closeable {
+
+ /**
+ * Fetch messages and offsets from the stream partition group
+ *
+ * @param startOffset The offset of the first message desired, inclusive
+ * @param endOffset The offset of the last message desired, exclusive, or
null
+ * @param timeoutMs Timeout in milliseconds
+ * @throws java.util.concurrent.TimeoutException If the operation could not
be completed within {@code timeoutMillis}
+ * milliseconds
+ * @return An iterable containing messages fetched from the stream partition
and their offsets
+ */
+ MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset,
StreamPartitionMsgOffset endOffset,
+ int timeoutMs)
+ throws TimeoutException;
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java
new file mode 100644
index 0000000..aad103d
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+/**
+ * A PartitionGroup is a group of partitions/shards that the same consumer
should consume from.
+ * This class contains all information which describes the latest state of a
partition group.
+ * It is constructed by looking at the segment zk metadata of the latest
segment of each partition group.
+ * It consists of:
+ * 1. partitionGroupId - A unique ID for the partitionGroup
+ * 2. sequenceNumber - The sequenceNumber this partitionGroup is currently at
+ * 3. startOffset - The start offset that the latest segment started consuming
from
+ * 4. endOffset - The endOffset (if segment consuming from this partition
group has finished consuming the segment and recorded the end offset)
+ * 5. status - the consumption status IN_PROGRESS/DONE
+ *
+ * This information is needed by the stream, when grouping the
partitions/shards into new partition groups.
+ */
+public class PartitionGroupConsumptionStatus {
+
+ private final int _partitionGroupId;
+ private int _sequenceNumber;
+ private StreamPartitionMsgOffset _startOffset;
+ private StreamPartitionMsgOffset _endOffset;
+ private String _status;
+
+ public PartitionGroupConsumptionStatus(int partitionGroupId, int
sequenceNumber, StreamPartitionMsgOffset startOffset,
+ StreamPartitionMsgOffset endOffset, String status) {
+ _partitionGroupId = partitionGroupId;
+ _sequenceNumber = sequenceNumber;
+ _startOffset = startOffset;
+ _endOffset = endOffset;
+ _status = status;
+ }
+
+ public int getPartitionGroupId() {
+ return _partitionGroupId;
+ }
+
+ public int getSequenceNumber() {
+ return _sequenceNumber;
+ }
+
+ public void setSequenceNumber(int sequenceNumber) {
+ _sequenceNumber = sequenceNumber;
+ }
+
+ public StreamPartitionMsgOffset getStartOffset() {
+ return _startOffset;
+ }
+
+ public void setStartOffset(StreamPartitionMsgOffset startOffset) {
+ _startOffset = startOffset;
+ }
+
+ public StreamPartitionMsgOffset getEndOffset() {
+ return _endOffset;
+ }
+
+ public void setEndOffset(StreamPartitionMsgOffset endOffset) {
+ _endOffset = endOffset;
+ }
+
+ public String getStatus() {
+ return _status;
+ }
+
+ public void setStatus(String status) {
+ _status = status;
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/LongMsgOffset.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
similarity index 51%
copy from pinot-spi/src/main/java/org/apache/pinot/spi/stream/LongMsgOffset.java
copy to
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
index e5025f6..5bb89a4 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/LongMsgOffset.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
@@ -1,8 +1,3 @@
-package org.apache.pinot.spi.stream;
-
-import com.google.common.annotations.VisibleForTesting;
-
-
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -21,33 +16,30 @@ import com.google.common.annotations.VisibleForTesting;
* specific language governing permissions and limitations
* under the License.
*/
-public class LongMsgOffset implements StreamPartitionMsgOffset {
- private final long _offset;
-
- @VisibleForTesting
- public long getOffset() {
- return _offset;
- }
+package org.apache.pinot.spi.stream;
- public LongMsgOffset(long offset) {
- _offset = offset;
- }
+/**
+ * A PartitionGroup is a group of partitions/shards that the same consumer
should consume from.
+ * This class is a container for the metadata regarding a partition group,
that is needed by a consumer to start consumption.
+ * It consists of
+ * 1. A unique partition group id for this partition group
+ * 2. The start offset to begin consumption for this partition group
+ */
+public class PartitionGroupMetadata {
- public LongMsgOffset(String offset) {
- _offset = Long.parseLong(offset);
- }
+ private final int _partitionGroupId;
+ private final StreamPartitionMsgOffset _startOffset;
- public LongMsgOffset(StreamPartitionMsgOffset other) {
- _offset = ((LongMsgOffset)other)._offset;
+ public PartitionGroupMetadata(int partitionGroupId, StreamPartitionMsgOffset
startOffset) {
+ _partitionGroupId = partitionGroupId;
+ _startOffset = startOffset;
}
- @Override
- public int compareTo(Object other) {
- return Long.compare(_offset, ((LongMsgOffset)other)._offset);
+ public int getPartitionGroupId() {
+ return _partitionGroupId;
}
- @Override
- public String toString() {
- return Long.toString(_offset);
+ public StreamPartitionMsgOffset getStartOffset() {
+ return _startOffset;
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionCountFetcher.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
similarity index 58%
rename from
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionCountFetcher.java
rename to
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
index d523235..6cc74ce 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionCountFetcher.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
@@ -18,32 +18,36 @@
*/
package org.apache.pinot.spi.stream;
+import java.util.List;
import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Fetches the partition count of a stream using the {@link
StreamMetadataProvider}
+ * Fetches the list of {@link PartitionGroupMetadata} for all partition groups
of the stream,
+ * using the {@link StreamMetadataProvider}
*/
-public class PartitionCountFetcher implements Callable<Boolean> {
+public class PartitionGroupMetadataFetcher implements Callable<Boolean> {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PartitionCountFetcher.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class);
- private int _partitionCount = -1;
+ private List<PartitionGroupMetadata> _newPartitionGroupMetadataList;
private final StreamConfig _streamConfig;
- private StreamConsumerFactory _streamConsumerFactory;
+ private final List<PartitionGroupConsumptionStatus>
_partitionGroupConsumptionStatusList;
+ private final StreamConsumerFactory _streamConsumerFactory;
private Exception _exception;
private final String _topicName;
- public PartitionCountFetcher(StreamConfig streamConfig) {
- _streamConfig = streamConfig;
- _streamConsumerFactory =
StreamConsumerFactoryProvider.create(_streamConfig);
+ public PartitionGroupMetadataFetcher(StreamConfig streamConfig,
List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList) {
+ _streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
_topicName = streamConfig.getTopicName();
+ _streamConfig = streamConfig;
+ _partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList;
}
- public int getPartitionCount() {
- return _partitionCount;
+ public List<PartitionGroupMetadata> getPartitionGroupMetadataList() {
+ return _newPartitionGroupMetadataList;
}
public Exception getException() {
@@ -51,21 +55,20 @@ public class PartitionCountFetcher implements
Callable<Boolean> {
}
/**
- * Callable to fetch the number of partitions of the stream given the stream
metadata
- * @return
- * @throws Exception
+ * Callable to fetch the {@link PartitionGroupMetadata} list, from the
stream.
+ * The stream requires the list of {@link PartitionGroupConsumptionStatus}
to compute the new {@link PartitionGroupMetadata}
*/
@Override
public Boolean call()
throws Exception {
-
- String clientId = PartitionCountFetcher.class.getSimpleName() + "-" +
_topicName;
+ String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() +
"-" + _topicName;
try (
StreamMetadataProvider streamMetadataProvider =
_streamConsumerFactory.createStreamMetadataProvider(clientId)) {
- _partitionCount =
streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs=*/5000L);
+ _newPartitionGroupMetadataList = streamMetadataProvider
+ .computePartitionGroupMetadata(clientId, _streamConfig,
_partitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/5000);
if (_exception != null) {
// We had at least one failure, but succeeded now. Log an info
- LOGGER.info("Successfully retrieved partition count as {} for topic
{}", _partitionCount, _topicName);
+ LOGGER.info("Successfully retrieved PartitionGroupMetadata for topic
{}", _topicName);
}
return Boolean.TRUE;
} catch (TransientConsumerException e) {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java
index 3a0a1d2..d6d690c 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java
@@ -28,7 +28,7 @@ import org.apache.pinot.spi.annotations.InterfaceStability;
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
-public interface PartitionLevelConsumer extends Closeable {
+public interface PartitionLevelConsumer extends Closeable,
PartitionGroupConsumer {
/**
* Is here for backward compatibility for a short time.
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java
deleted file mode 100644
index 1d50160..0000000
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.stream;
-
-import java.util.concurrent.Callable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Fetches the partition offset for a stream given the offset criteria, using
the {@link StreamMetadataProvider}
- */
-public class PartitionOffsetFetcher implements Callable<Boolean> {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(PartitionOffsetFetcher.class);
- private static final int STREAM_PARTITION_OFFSET_FETCH_TIMEOUT_MILLIS =
10000;
-
- private final String _topicName;
- private final OffsetCriteria _offsetCriteria;
- private final int _partitionId;
-
- private Exception _exception = null;
- private StreamPartitionMsgOffset _offset;
- private StreamConsumerFactory _streamConsumerFactory;
- StreamConfig _streamConfig;
-
- public PartitionOffsetFetcher(final OffsetCriteria offsetCriteria, int
partitionId, StreamConfig streamConfig) {
- _offsetCriteria = offsetCriteria;
- _partitionId = partitionId;
- _streamConfig = streamConfig;
- _streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
- _topicName = streamConfig.getTopicName();
- }
-
- public StreamPartitionMsgOffset getOffset() {
- return _offset;
- }
-
- public Exception getException() {
- return _exception;
- }
-
- /**
- * Callable to fetch the offset of the partition given the stream metadata
and offset criteria
- * @return
- * @throws Exception
- */
- @Override
- public Boolean call()
- throws Exception {
- String clientId = PartitionOffsetFetcher.class.getSimpleName() + "-" +
_topicName + "-" + _partitionId;
- try (StreamMetadataProvider streamMetadataProvider = _streamConsumerFactory
- .createPartitionMetadataProvider(clientId, _partitionId)) {
- _offset =
- streamMetadataProvider.fetchStreamPartitionOffset(_offsetCriteria,
STREAM_PARTITION_OFFSET_FETCH_TIMEOUT_MILLIS);
- if (_exception != null) {
- LOGGER.info("Successfully retrieved offset({}) for stream topic {}
partition {}", _offset, _topicName,
- _partitionId);
- }
- return Boolean.TRUE;
- } catch (TransientConsumerException e) {
- LOGGER.warn("Temporary exception when fetching offset for topic {}
partition {}:{}", _topicName, _partitionId,
- e.getMessage());
- _exception = e;
- return Boolean.FALSE;
- } catch (Exception e) {
- _exception = e;
- throw e;
- }
- }
-}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
index 27205c9..2132a71 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
@@ -19,7 +19,6 @@
package org.apache.pinot.spi.stream;
import java.util.Set;
-import org.apache.pinot.spi.data.Schema;
/**
@@ -73,4 +72,13 @@ public abstract class StreamConsumerFactory {
public StreamPartitionMsgOffsetFactory createStreamMsgOffsetFactory() {
return new LongMsgOffsetFactory();
}
+
+ /**
+ * Creates a partition group consumer, which can fetch messages from a
partition group
+ */
+ public PartitionGroupConsumer createPartitionGroupConsumer(String clientId,
+ PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) {
+ return createPartitionLevelConsumer(clientId,
partitionGroupConsumptionStatus.getPartitionGroupId());
+ }
+
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index 557ffc4..2fc059d 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -19,6 +19,10 @@
package org.apache.pinot.spi.stream;
import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.annotations.InterfaceStability;
@@ -32,7 +36,7 @@ import org.apache.pinot.spi.annotations.InterfaceStability;
public interface StreamMetadataProvider extends Closeable {
/**
* Fetches the number of partitions for a topic given the stream configs
- * @param timeoutMillis
+ * @param timeoutMillis Fetch timeout
* @return
*/
int fetchPartitionCount(long timeoutMillis);
@@ -43,14 +47,47 @@ public interface StreamMetadataProvider extends Closeable {
throws java.util.concurrent.TimeoutException;
/**
* Fetches the offset for a given partition and offset criteria
- * @param offsetCriteria
- * @param timeoutMillis
- * @return
- * @throws java.util.concurrent.TimeoutException
+ * @param offsetCriteria offset criteria to fetch{@link
StreamPartitionMsgOffset}.
+ * Depends on the semantics of the stream e.g.
smallest, largest for Kafka
+ * @param timeoutMillis fetch timeout
+ * @return {@link StreamPartitionMsgOffset} based on the offset criteria
provided
+ * @throws java.util.concurrent.TimeoutException if timed out trying to
connect and fetch from stream
*/
default StreamPartitionMsgOffset fetchStreamPartitionOffset(@Nonnull
OffsetCriteria offsetCriteria, long timeoutMillis)
throws java.util.concurrent.TimeoutException {
long offset = fetchPartitionOffset(offsetCriteria, timeoutMillis);
return new LongMsgOffset(offset);
}
+
+ /**
+ * Computes the list of {@link PartitionGroupMetadata} for the latest state
of the stream, using the current {@link PartitionGroupConsumptionStatus}
+ *
+ * Default behavior is the one for the Kafka stream, where each partition
group contains only one partition
+ * @param partitionGroupConsumptionStatuses list of {@link
PartitionGroupConsumptionStatus} for current partition groups
+ */
+ default List<PartitionGroupMetadata> computePartitionGroupMetadata(String
clientId, StreamConfig streamConfig,
+ List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatuses,
int timeoutMillis)
+ throws TimeoutException, IOException {
+ int partitionCount = fetchPartitionCount(timeoutMillis);
+ List<PartitionGroupMetadata> newPartitionGroupMetadataList = new
ArrayList<>(partitionCount);
+
+ // Add a PartitionGroupMetadata into the list, foreach partition already
present in current.
+ // Setting endOffset (exclusive) as the startOffset for new partition
group.
+ // If partition group is still in progress, this value will be null
+ for (PartitionGroupConsumptionStatus
currentPartitionGroupConsumptionStatus : partitionGroupConsumptionStatuses) {
+ newPartitionGroupMetadataList.add(new
PartitionGroupMetadata(currentPartitionGroupConsumptionStatus.getPartitionGroupId(),
+ currentPartitionGroupConsumptionStatus.getEndOffset()));
+ }
+ // Add PartitionGroupMetadata for new partitions
+ // Use offset criteria from stream config
+ StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
+ for (int i = partitionGroupConsumptionStatuses.size(); i < partitionCount;
i++) {
+ StreamMetadataProvider partitionMetadataProvider =
+ streamConsumerFactory.createPartitionMetadataProvider(clientId, i);
+ StreamPartitionMsgOffset streamPartitionMsgOffset =
+
partitionMetadataProvider.fetchStreamPartitionOffset(streamConfig.getOffsetCriteria(),
timeoutMillis);
+ newPartitionGroupMetadataList.add(new PartitionGroupMetadata(i,
streamPartitionMsgOffset));
+ }
+ return newPartitionGroupMetadataList;
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java
index 72654bf..cbb3d3b 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java
@@ -42,16 +42,12 @@ import org.apache.pinot.spi.annotations.InterfaceStability;
public interface StreamPartitionMsgOffset extends Comparable {
/**
- * Compare this offset with another one.
- *
- * @param other
- * @return
+ * A serialized representation of the offset object as a String.
*/
- int compareTo(Object other);
+ String toString();
/**
- * A serialized representation of the offset object as a String.
- * @return
+ * Converts the string to a {@link StreamPartitionMsgOffset}
*/
- String toString();
+ StreamPartitionMsgOffset fromString(String streamPartitionMsgOffsetStr);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]