KKcorps commented on code in PR #15563:
URL: https://github.com/apache/pinot/pull/15563#discussion_r2059576964
##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java:
##########
@@ -93,11 +93,31 @@ private KinesisMessageBatch
getKinesisMessageBatch(KinesisPartitionGroupOffset s
return new KinesisMessageBatch(List.of(), startOffset, true);
}
- // Read records
- rateLimitRequests();
- GetRecordsRequest getRecordRequest =
-
GetRecordsRequest.builder().shardIterator(shardIterator).limit(_config.getNumMaxRecordsToFetch()).build();
- GetRecordsResponse getRecordsResponse =
_kinesisClient.getRecords(getRecordRequest);
+ // Read records from kinesis.
+ // Based on getRecords documentation, we might get a response with empty
records but a non-null nextShardIterator.
+ // This method is also used to accurately determine if we reached end of
shard. So, we need to use nextShardIterator
+ // and call getRecords again until we get non-empty records or null
nextShardIterator.
+ // To prevent an infinite loop due to some bug, we will limit the number
of attempts
+ GetRecordsResponse getRecordsResponse;
+ int attempts = 0;
+ while (true) {
Review Comment:
No, the reason we also did it was because it was leading to confusion when
debugging via logs where consumer thread was retrying vs KinesisConsumer object.
I'd prefer keeping this the old way
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]