This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to
refs/heads/sharded_consumer_type_support_with_kinesis by this push:
new 666c220 Dont create new CONSUMING segment if shard has reached end of
life
666c220 is described below
commit 666c220f1a72124e003b2f9d15b3c087fec2a31f
Author: Neha Pawar <[email protected]>
AuthorDate: Thu Jan 7 16:07:16 2021 -0800
Dont create new CONSUMING segment if shard has reached end of life
---
.../protocols/SegmentCompletionProtocol.java | 1 +
.../realtime/PinotLLCRealtimeSegmentManager.java | 101 ++++++++++-----------
.../RealtimeSegmentValidationManager.java | 2 +-
.../PinotLLCRealtimeSegmentManagerTest.java | 3 +-
.../realtime/LLRealtimeSegmentDataManager.java | 13 ++-
.../plugin/stream/kinesis/KinesisConsumer.java | 8 +-
.../stream/kinesis/KinesisConsumerFactory.java | 2 +-
.../plugin/stream/kinesis/KinesisRecordsBatch.java | 9 +-
.../kinesis/KinesisStreamMetadataProvider.java | 48 +++++++---
.../org/apache/pinot/spi/stream/MessageBatch.java | 7 ++
10 files changed, 115 insertions(+), 79 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index dd1330d..74614df 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -138,6 +138,7 @@ public class SegmentCompletionProtocol {
public static final String REASON_ROW_LIMIT = "rowLimit"; // Stop reason
sent by server as max num rows reached
public static final String REASON_TIME_LIMIT = "timeLimit"; // Stop reason
sent by server as max time reached
+ public static final String REASON_END_OF_PARTITION_GROUP =
"endOfPartitionGroup"; // Stop reason sent by server as end of partitionGroup
reached
// Canned responses
public static final Response RESP_NOT_LEADER =
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index d5578a3..8bf9cd0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -477,7 +477,6 @@ public class PinotLLCRealtimeSegmentManager {
Preconditions
.checkState(idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING),
"Failed to find instance in CONSUMING state in IdealState for
segment: %s", committingSegmentName);
- int numPartitions = getNumPartitionsFromIdealState(idealState);
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
/*
@@ -496,18 +495,21 @@ public class PinotLLCRealtimeSegmentManager {
// Step-2
- // Say we currently were consuming from 3 shards A, B, C. Of those, A is
the one committing. Also suppose that new partition D has come up
+ // Say we currently were consuming from 2 shards A, B. Of those, A is the
one committing.
- // get current partition groups - this gives current state of latest
segments for each partition [A - DONE], [B - IN_PROGRESS], [C - IN_PROGRESS]
+ // get current partition groups - this gives current state of latest
segments for each partition [A - DONE], [B - IN_PROGRESS]
List<PartitionGroupMetadata> currentPartitionGroupMetadataList =
getCurrentPartitionGroupMetadataList(idealState);
PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
IngestionConfigUtils.getStreamConfigMap(tableConfig));
- // find new partition groups [A],[B],[C],[D]
+ // find new partition groups [A],[B],[C],[D] (assume A split into C D)
+ // If segment has consumed all of A, we will receive B,C,D
+ // If segment is still not reached last msg of A, we will receive A,B,C,D
List<PartitionGroupInfo> newPartitionGroupInfoList =
getPartitionGroupInfoList(streamConfig,
currentPartitionGroupMetadataList);
+ int numPartitions = newPartitionGroupInfoList.size();
- // create new segment metadata, only if it is not IN_PROGRESS in the
current state
+ // create new segment metadata, only if PartitionGroupInfo was returned
for it in the newPartitionGroupInfoList
Map<Integer, PartitionGroupMetadata> currentGroupIdToMetadata =
currentPartitionGroupMetadataList.stream().collect(
Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p));
@@ -519,36 +521,25 @@ public class PinotLLCRealtimeSegmentManager {
PartitionGroupMetadata currentPartitionGroupMetadata =
currentGroupIdToMetadata.get(newPartitionGroupId);
if (currentPartitionGroupMetadata == null) { // not present in current
state. New partition found.
// make new segment
- // FIXME: flushThreshold of segment is actually (configured
threshold/numPartitions)
- // In Kinesis, with every split/merge, we get new partitions, and an
old partition gets deactivated.
- // However, the getPartitionGroupInfo call returns ALL shards,
regardless of whether they're active or not.
- // So our numPartitions will forever keep increasing.
- // TODO: can the getPartitionGroupInfo return the active partitions
only, based on the checkpoints passed in current?
+ // fixme: letting validation manager do this would be best, otherwise
we risk creating multiple CONSUMING segments
String newLLCSegmentName =
setupNewPartitionGroup(tableConfig, streamConfig,
partitionGroupInfo, newSegmentCreationTimeMs,
instancePartitions, numPartitions, numReplicas);
newConsumingSegmentNames.add(newLLCSegmentName);
} else {
- String currentStatus = currentPartitionGroupMetadata.getStatus();
- if (!currentStatus.equals(Status.IN_PROGRESS.toString())) {
- // not IN_PROGRESS anymore in current state. Should be DONE.
- // This should ONLY happen for the committing segment's partition.
Need to trigger new consuming segment
- // todo: skip this if the partition doesn't match with the
committing segment?
+ LLCSegmentName committingLLCSegment = new
LLCSegmentName(committingSegmentName);
+ // Update this only for committing segment. All other partitions
should get updated by their own commit call
+ if (newPartitionGroupId == committingLLCSegment.getPartitionGroupId())
{
+
Preconditions.checkState(currentPartitionGroupMetadata.getStatus().equals(Status.DONE.toString()));
LLCSegmentName newLLCSegmentName = new LLCSegmentName(rawTableName,
newPartitionGroupId,
currentPartitionGroupMetadata.getSequenceNumber() + 1,
newSegmentCreationTimeMs);
createNewSegmentZKMetadata(tableConfig, streamConfig,
newLLCSegmentName, newSegmentCreationTimeMs,
committingSegmentDescriptor, committingSegmentZKMetadata,
instancePartitions, numPartitions, numReplicas);
newConsumingSegmentNames.add(newLLCSegmentName.getSegmentName());
-
- // FIXME: a new CONSUMING segment is created even if EOL for this
shard has been reached.
- // the logic in getPartitionGroupInfo to prevent returning of EOLed
shards isn't working
- // OPTION: Since consumer knows about it, it can pass param in
request/committingSegmentDescriptor "isEndOfShard"
- // We can set that in metadata for validation manager to skip these
partitions
}
}
}
-
// Step-3
SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
@@ -840,8 +831,9 @@ public class PinotLLCRealtimeSegmentManager {
if (idealState.isEnabled()) {
List<PartitionGroupMetadata> currentPartitionGroupMetadataList =
getCurrentPartitionGroupMetadataList(idealState);
- int numPartitions = getPartitionGroupInfoList(streamConfig,
currentPartitionGroupMetadataList).size();
- return ensureAllPartitionsConsuming(tableConfig, streamConfig,
idealState, numPartitions);
+ List<PartitionGroupInfo> newPartitionGroupInfoList =
+ getPartitionGroupInfoList(streamConfig,
currentPartitionGroupMetadataList);
+ return ensureAllPartitionsConsuming(tableConfig, streamConfig,
idealState, newPartitionGroupInfoList);
} else {
LOGGER.info("Skipping LLC segments validation for disabled table: {}",
realtimeTableName);
return idealState;
@@ -969,11 +961,14 @@ public class PinotLLCRealtimeSegmentManager {
*/
@VisibleForTesting
IdealState ensureAllPartitionsConsuming(TableConfig tableConfig,
PartitionLevelStreamConfig streamConfig,
- IdealState idealState, int numPartitions) {
+ IdealState idealState, List<PartitionGroupInfo>
newPartitionGroupInfoList) {
String realtimeTableName = tableConfig.getTableName();
InstancePartitions instancePartitions =
getConsumingInstancePartitions(tableConfig);
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
+ int numPartitions = newPartitionGroupInfoList.size();
+ Set<Integer> newPartitionGroupSet =
+
newPartitionGroupInfoList.stream().map(PartitionGroupInfo::getPartitionGroupId).collect(Collectors.toSet());
SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
@@ -1010,7 +1005,7 @@ public class PinotLLCRealtimeSegmentManager {
Map<String, String> instanceStateMap =
instanceStatesMap.get(latestSegmentName);
if (instanceStateMap != null) {
// Latest segment of metadata is in idealstate.
- if (instanceStateMap.values().contains(SegmentStateModel.CONSUMING)) {
+ if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
if (latestSegmentZKMetadata.getStatus() == Status.DONE) {
// step-1 of commmitSegmentMetadata is done (i.e. marking old
segment as DONE)
@@ -1021,15 +1016,23 @@ public class PinotLLCRealtimeSegmentManager {
}
LOGGER.info("Repairing segment: {} which is DONE in segment ZK
metadata, but is CONSUMING in IdealState",
latestSegmentName);
-
- LLCSegmentName newLLCSegmentName =
getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
- String newSegmentName = newLLCSegmentName.getSegmentName();
- CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(latestSegmentName,
-
(offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
- createNewSegmentZKMetadata(tableConfig, streamConfig,
newLLCSegmentName, currentTimeMs,
- committingSegmentDescriptor, latestSegmentZKMetadata,
instancePartitions, numPartitions, numReplicas);
- updateInstanceStatesForNewConsumingSegment(instanceStatesMap,
latestSegmentName, newSegmentName,
- segmentAssignment, instancePartitionsMap);
+ if (newPartitionGroupSet.contains(partitionGroupId)) {
+ LLCSegmentName newLLCSegmentName =
getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
+ String newSegmentName = newLLCSegmentName.getSegmentName();
+ CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(latestSegmentName,
+
(offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
+ createNewSegmentZKMetadata(tableConfig, streamConfig,
newLLCSegmentName, currentTimeMs,
+ committingSegmentDescriptor, latestSegmentZKMetadata,
instancePartitions, numPartitions, numReplicas);
+ updateInstanceStatesForNewConsumingSegment(instanceStatesMap,
latestSegmentName, newSegmentName,
+ segmentAssignment, instancePartitionsMap);
+ } else { // partition group reached end of life
+ LOGGER.info(
+ "PartitionGroup: {} has reached end of life. Updating ideal
state for segment: {}. "
+ + "Skipping creation of new ZK metadata and new segment
in ideal state",
+ partitionGroupId, latestSegmentName);
+ updateInstanceStatesForNewConsumingSegment(instanceStatesMap,
latestSegmentName, null, segmentAssignment,
+ instancePartitionsMap);
+ }
}
// else, the metadata should be IN_PROGRESS, which is the right
state for a consuming segment.
} else { // no replica in CONSUMING state
@@ -1062,11 +1065,14 @@ public class PinotLLCRealtimeSegmentManager {
updateInstanceStatesForNewConsumingSegment(instanceStatesMap,
null, newSegmentName, segmentAssignment,
instancePartitionsMap);
} else {
- // If we get here, that means in IdealState, the latest segment
has no CONSUMING replicas, but has replicas
- // not OFFLINE. That is an unexpected state which cannot be fixed
by the validation manager currently. In
- // that case, we need to either extend this part to handle the
state, or prevent segments from getting into
- // such state.
- LOGGER.error("Got unexpected instance state map: {} for segment:
{}", instanceStateMap, latestSegmentName);
+ if (!newPartitionGroupSet.contains(partitionGroupId)) {
+ // If we get here, that means in IdealState, the latest segment
has no CONSUMING replicas, but has replicas
+ // not OFFLINE. That is an unexpected state which cannot be
fixed by the validation manager currently. In
+ // that case, we need to either extend this part to handle the
state, or prevent segments from getting into
+ // such state.
+ LOGGER.error("Got unexpected instance state map: {} for segment:
{}", instanceStateMap, latestSegmentName);
+ }
+ // else, the partition group has reached end of life. This is an
acceptable state
}
}
} else {
@@ -1108,10 +1114,7 @@ public class PinotLLCRealtimeSegmentManager {
}
// Set up new partitions if not exist
- List<PartitionGroupMetadata> currentPartitionGroupMetadataList =
getCurrentPartitionGroupMetadataList(idealState);
- List<PartitionGroupInfo> partitionGroupInfoList =
- getPartitionGroupInfoList(streamConfig,
currentPartitionGroupMetadataList);
- for (PartitionGroupInfo partitionGroupInfo : partitionGroupInfoList) {
+ for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupInfoList) {
int partitionGroupId = partitionGroupInfo.getPartitionGroupId();
if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) {
String newSegmentName =
@@ -1159,18 +1162,6 @@ public class PinotLLCRealtimeSegmentManager {
return System.currentTimeMillis();
}
- // fixme: investigate if this should only return active partitions (i.e.
skip a shard if it has reached eol)
- // or return all unique partitions found in ideal state right from the
birth of the table
- private int getNumPartitionsFromIdealState(IdealState idealState) {
- Set<String> uniquePartitions = new HashSet<>();
- for (String segmentName : idealState.getRecord().getMapFields().keySet()) {
- if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
- uniquePartitions.add(String.valueOf(new
LLCSegmentName(segmentName).getPartitionGroupId()));
- }
- }
- return uniquePartitions.size();
- }
-
private int getNumReplicas(TableConfig tableConfig, InstancePartitions
instancePartitions) {
if (instancePartitions.getNumReplicaGroups() == 1) {
// Non-replica-group based
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index d611433..96604dd 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -58,7 +58,7 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
LeadControllerManager leadControllerManager,
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
ValidationMetrics validationMetrics, ControllerMetrics
controllerMetrics) {
super("RealtimeSegmentValidationManager",
config.getRealtimeSegmentValidationFrequencyInSeconds(),
- config.getRealtimeSegmentValidationManagerInitialDelaySeconds(),
pinotHelixResourceManager,
+ 6000, pinotHelixResourceManager,
leadControllerManager, controllerMetrics);
_llcRealtimeSegmentManager = llcRealtimeSegmentManager;
_validationMetrics = validationMetrics;
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 75c8057..0f33556 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -850,7 +850,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
}
public void ensureAllPartitionsConsuming() {
- ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState,
_numPartitions);
+ ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState,
+ getPartitionGroupInfoList(_streamConfig, Collections.emptyList()));
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 7d82b4e..40b49b8 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -240,6 +240,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
// Segment end criteria
private volatile long _consumeEndTime = 0;
private Checkpoint _finalOffset; // Used when we want to catch up to this one
+ private boolean _endOfPartitionGroup = false;
private volatile boolean _shouldStop = false;
// It takes 30s to locate controller leader, and more if there are multiple
controller failures.
@@ -305,6 +306,13 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
_numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
_stopReason = SegmentCompletionProtocol.REASON_ROW_LIMIT;
return true;
+ } else if (_endOfPartitionGroup) {
+ segmentLogger.info("Stopping consumption due to end of
partitionGroup reached nRows={} numRowsIndexed={}, numRowsConsumed={}",
+ _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
+ _stopReason =
SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP;
+ // fixme: what happens if reached endOfPartitionGroup but
numDocsIndexed == 0
+ // If we decide to only setupNewPartitions via ValidationManager,
we don't need commit on endOfShard
+ return true;
}
return false;
@@ -383,6 +391,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
try {
messageBatch = _partitionGroupConsumer
.fetchMessages(_currentOffset, null,
_partitionLevelStreamConfig.getFetchTimeoutMillis());
+ _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup();
consecutiveErrorCount = 0;
} catch (TransientConsumerException e) {
handleTransientStreamErrors(e);
@@ -1239,9 +1248,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
// long as the partition function is not changed.
int numPartitions = columnPartitionConfig.getNumPartitions();
try {
- // fixme: get this from ideal state
- int numStreamPartitions = _streamMetadataProvider
- .getPartitionGroupInfoList(_clientId,
_partitionLevelStreamConfig, Collections.emptyList(), 5000).size();
+ int numStreamPartitions =
_streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs=*/5000L);
if (numStreamPartitions != numPartitions) {
segmentLogger.warn(
"Number of stream partitions: {} does not match number of
partitions in the partition config: {}, using number of stream partitions",
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 70d2c8a..5cbd7e6 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -125,8 +125,7 @@ public class KinesisConsumer extends
KinesisConnectionHandler implements Partiti
if (nextStartSequenceNumber == null && recordList.size() > 0) {
nextStartSequenceNumber = recordList.get(recordList.size() -
1).sequenceNumber();
}
-
- return new KinesisRecordsBatch(recordList, next.getKey());
+ return new KinesisRecordsBatch(recordList, next.getKey(), isEndOfShard);
} catch (IllegalStateException e) {
LOG.warn("Illegal state exception, connection is broken", e);
return handleException(kinesisStartCheckpoint, recordList);
@@ -158,10 +157,9 @@ public class KinesisConsumer extends
KinesisConnectionHandler implements Partiti
Map<String, String> newCheckpoint = new
HashMap<>(start.getShardToStartSequenceMap());
newCheckpoint.put(newCheckpoint.keySet().iterator().next(),
nextStartSequenceNumber);
- return new KinesisRecordsBatch(recordList, shardId);
+ return new KinesisRecordsBatch(recordList, shardId, false);
} else {
- return new KinesisRecordsBatch(recordList, shardId);
-
+ return new KinesisRecordsBatch(recordList, shardId, false);
}
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index 631f240..fc9c4af 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -48,7 +48,7 @@ public class KinesisConsumerFactory extends
StreamConsumerFactory {
@Override
public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
- return new KinesisStreamMetadataProvider(clientId, new
KinesisConfig(_streamConfig));
+ return new KinesisStreamMetadataProvider(clientId, _streamConfig);
}
@Override
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
index fdc883b..b3eb626 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
@@ -32,10 +32,12 @@ import software.amazon.awssdk.services.kinesis.model.Record;
public class KinesisRecordsBatch implements MessageBatch<byte[]> {
private final List<Record> _recordList;
private final String _shardId;
+ private final boolean _endOfShard;
- public KinesisRecordsBatch(List<Record> recordList, String shardId) {
+ public KinesisRecordsBatch(List<Record> recordList, String shardId, boolean
endOfShard) {
_recordList = recordList;
_shardId = shardId;
+ _endOfShard = endOfShard;
}
@Override
@@ -68,4 +70,9 @@ public class KinesisRecordsBatch implements
MessageBatch<byte[]> {
public long getNextStreamMessageOffsetAtIndex(int index) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public boolean isEndOfPartitionGroup() {
+ return _endOfShard;
+ }
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
index 6c55a18..8968b56 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
@@ -1,27 +1,45 @@
package org.apache.pinot.plugin.stream.kinesis;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.PartitionGroupInfo;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import software.amazon.awssdk.services.kinesis.model.Shard;
public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
private final KinesisConnectionHandler _kinesisConnectionHandler;
+ private final StreamConsumerFactory _kinesisStreamConsumerFactory;
+ private final String _clientId;
+ private final int _fetchTimeoutMs;
- public KinesisStreamMetadataProvider(String clientId, KinesisConfig
kinesisConfig) {
+ public KinesisStreamMetadataProvider(String clientId, StreamConfig
streamConfig) {
+ KinesisConfig kinesisConfig = new KinesisConfig(streamConfig);
_kinesisConnectionHandler = new
KinesisConnectionHandler(kinesisConfig.getStream(),
kinesisConfig.getAwsRegion());
+ _kinesisStreamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
+ _clientId = clientId;
+ _fetchTimeoutMs = streamConfig.getFetchTimeoutMillis();
}
@Override
@@ -37,7 +55,7 @@ public class KinesisStreamMetadataProvider implements
StreamMetadataProvider {
@Override
public List<PartitionGroupInfo> getPartitionGroupInfoList(String clientId,
StreamConfig streamConfig,
List<PartitionGroupMetadata> currentPartitionGroupsMetadata, int
timeoutMillis)
- throws IOException {
+ throws IOException, TimeoutException {
Map<Integer, PartitionGroupMetadata> currentPartitionGroupMap =
currentPartitionGroupsMetadata.stream().collect(Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId,
p -> p));
@@ -45,10 +63,12 @@ public class KinesisStreamMetadataProvider implements
StreamMetadataProvider {
List<PartitionGroupInfo> newPartitionGroupInfos = new ArrayList<>();
List<Shard> shards = _kinesisConnectionHandler.getShards();
for (Shard shard : shards) { // go over all shards
+ KinesisCheckpoint newStartCheckpoint;
+
String shardId = shard.shardId();
int partitionGroupId = getPartitionGroupIdFromShardId(shardId);
PartitionGroupMetadata currentPartitionGroupMetadata =
currentPartitionGroupMap.get(partitionGroupId);
- KinesisCheckpoint newStartCheckpoint;
+
if (currentPartitionGroupMetadata != null) { // existing shard
KinesisCheckpoint currentEndCheckpoint = null;
try {
@@ -59,15 +79,18 @@ public class KinesisStreamMetadataProvider implements
StreamMetadataProvider {
if (currentEndCheckpoint != null) { // end checkpoint available i.e.
committing segment
String endingSequenceNumber =
shard.sequenceNumberRange().endingSequenceNumber();
if (endingSequenceNumber != null) { // shard has ended
- // FIXME: this logic is not working
- // was expecting sequenceNumOfLastMsgInShard ==
endSequenceNumOfShard.
- // But it is much lesser than the endSeqNumOfShard
- Map<String, String> shardToSequenceNumberMap = new HashMap<>();
- shardToSequenceNumberMap.put(shardId, endingSequenceNumber);
- KinesisCheckpoint shardEndCheckpoint = new
KinesisCheckpoint(shardToSequenceNumberMap);
- if (currentEndCheckpoint.compareTo(shardEndCheckpoint) >= 0) {
- // shard has ended AND we have reached the end checkpoint.
- // skip this partition group in the result
+ // check if segment has consumed all the messages already
+ PartitionGroupConsumer partitionGroupConsumer =
+
_kinesisStreamConsumerFactory.createPartitionGroupConsumer(_clientId,
currentPartitionGroupMetadata);
+
+ MessageBatch messageBatch;
+ try {
+ messageBatch =
partitionGroupConsumer.fetchMessages(currentEndCheckpoint, null,
_fetchTimeoutMs);
+ } finally {
+ partitionGroupConsumer.close();
+ }
+ if (messageBatch.isEndOfPartitionGroup()) {
+ // shard has ended. Skip it from results
continue;
}
}
@@ -80,6 +103,7 @@ public class KinesisStreamMetadataProvider implements
StreamMetadataProvider {
shardToSequenceNumberMap.put(shardId,
shard.sequenceNumberRange().startingSequenceNumber());
newStartCheckpoint = new KinesisCheckpoint(shardToSequenceNumberMap);
}
+
newPartitionGroupInfos
.add(new PartitionGroupInfo(partitionGroupId,
newStartCheckpoint.serialize()));
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
index 3052b9e..02c721f 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
@@ -81,4 +81,11 @@ public interface MessageBatch<T> {
default StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int
index) {
return new LongMsgOffset(getNextStreamMessageOffsetAtIndex(index));
}
+
+ /**
+ * Returns true if end of the consumer detects that no more records can be
read from this partition group for good
+ */
+ default boolean isEndOfPartitionGroup() {
+ return false;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]