krishan1390 commented on code in PR #15563:
URL: https://github.com/apache/pinot/pull/15563#discussion_r2055316372
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -475,7 +475,10 @@ protected boolean consumeLoop()
messageBatch.getMessageCount(),
messageBatch.getUnfilteredMessageCount(),
messageBatch.isEndOfPartitionGroup());
}
- _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup();
+ // We need to check for both endOfPartitionGroup and messageCount ==
0, because
+ // endOfPartitionGroup can be true even if this is the last batch of
messages (has been observed for kinesis)
+ // To process the last batch of messages, we need to set
_endOfPartitionGroup to false in such a case
+ _endOfPartitionGroup = messageBatch.getMessageCount() == 0 &&
messageBatch.isEndOfPartitionGroup();
Review Comment:
This was observed after a shard is split and we try to pause and resume
table with smallest offset. The server doesn't process the last batch of
messages and commits the segment.
Then the controller determines that the shard is not completely consumed and
starts a new segment for the parent shard. The server goes through the same
loop and doesn't process the last batch of messages and commits a segment with
0 docs. This cycle repeats.
Note that this is intermittent based on how kinesis responds (sometimes
kinesis can respond with isEndOfPartitionGroup false too)
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -475,7 +475,10 @@ protected boolean consumeLoop()
messageBatch.getMessageCount(),
messageBatch.getUnfilteredMessageCount(),
messageBatch.isEndOfPartitionGroup());
}
- _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup();
+ // We need to check for both endOfPartitionGroup and messageCount ==
0, because
+ // endOfPartitionGroup can be true even if this is the last batch of
messages (has been observed for kinesis)
+ // To process the last batch of messages, we need to set
_endOfPartitionGroup to false in such a case
+ _endOfPartitionGroup = messageBatch.getMessageCount() == 0 &&
messageBatch.isEndOfPartitionGroup();
Review Comment:
although functionally we can move to KinesisMessageBatch, but I think a
function named "isEndOfPartitionGroup()" should return true in this case where
message count > 0 but is the last message in the shard.
also we will need to duplicate the check in PulsarMessageBatch too because
that too is susceptible to the same bug.
I am fine to move it if you still think it should be moved. Let me know
##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java:
##########
@@ -93,11 +93,31 @@ private KinesisMessageBatch
getKinesisMessageBatch(KinesisPartitionGroupOffset s
return new KinesisMessageBatch(List.of(), startOffset, true);
}
- // Read records
- rateLimitRequests();
- GetRecordsRequest getRecordRequest =
-
GetRecordsRequest.builder().shardIterator(shardIterator).limit(_config.getNumMaxRecordsToFetch()).build();
- GetRecordsResponse getRecordsResponse =
_kinesisClient.getRecords(getRecordRequest);
+ // Read records from kinesis.
+ // Based on getRecords documentation, we might get a response with empty
records but a non-null nextShardIterator.
+ // This method is also used to accurately determine if we reached end of
shard. So, we need to use nextShardIterator
+ // and call getRecords again until we get non-empty records or null
nextShardIterator.
+ // To prevent an infinite loop due to some bug, we will limit the number
of attempts
+ GetRecordsResponse getRecordsResponse;
+ int attempts = 0;
+ while (true) {
Review Comment:
thanks. I see that primarily that change to remove the while loop was to
address the "Return the message batch immediately without combining multiple of
them" through removing "messages.size() >= _config.getNumMaxRecordsToFetch()"
This particular behaviour doesn't change and we won't combine multiple
batches.
The new while loop just handles cases where message batch is empty and shard
iterator is not null. It iterates until it finds a null shard iterator or a non
empty message batch.
this seems like a good change right ? although we can make the client in the
client method consumedEndOfShard() but I think adding it to
getKinesisMessageBatch() is more appropriate so that the case is handled for
other use cases too.
what do you suggest now ?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -994,15 +994,17 @@ Set<Integer> getPartitionIds(StreamConfig streamConfig)
@VisibleForTesting
Set<Integer> getPartitionIds(List<StreamConfig> streamConfigs, IdealState
idealState) {
Set<Integer> partitionIds = new HashSet<>();
- boolean allPartitionIdsFetched = true;
+ boolean allPartitionIdsFetched = false;
for (int i = 0; i < streamConfigs.size(); i++) {
final int index = i;
try {
partitionIds.addAll(getPartitionIds(streamConfigs.get(index)).stream()
.map(partitionId ->
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(partitionId,
index))
.collect(Collectors.toSet()));
+ allPartitionIdsFetched = true;
Review Comment:
thanks. good catch. fixed it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]