KKcorps commented on code in PR #15563:
URL: https://github.com/apache/pinot/pull/15563#discussion_r2054663147
##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java:
##########
@@ -93,11 +93,31 @@ private KinesisMessageBatch
getKinesisMessageBatch(KinesisPartitionGroupOffset s
return new KinesisMessageBatch(List.of(), startOffset, true);
}
- // Read records
- rateLimitRequests();
- GetRecordsRequest getRecordRequest =
-
GetRecordsRequest.builder().shardIterator(shardIterator).limit(_config.getNumMaxRecordsToFetch()).build();
- GetRecordsResponse getRecordsResponse =
_kinesisClient.getRecords(getRecordRequest);
+ // Read records from kinesis.
+ // Based on getRecords documentation, we might get a response with empty
records but a non-null nextShardIterator.
+ // This method is also used to accurately determine if we reached end of
shard. So, we need to use nextShardIterator
+ // and call getRecords again until we get non-empty records or null
nextShardIterator.
+ // To prevent an infinite loop due to some bug, we will limit the number
of attempts
+ GetRecordsResponse getRecordsResponse;
+ int attempts = 0;
+ while (true) {
Review Comment:
Let's remove this. We had this while loop code a while back but chose to
remove it in #12806
we want the consumer thread to handle empty record response
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]