Jackie-Jiang commented on code in PR #12697: URL: https://github.com/apache/pinot/pull/12697#discussion_r1538617780
########## pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java: ########## @@ -52,220 +46,112 @@ */ public class KinesisConsumer extends KinesisConnectionHandler implements PartitionGroupConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(KinesisConsumer.class); - public static final long SLEEP_TIME_BETWEEN_REQUESTS = 1000L; - private final String _streamTopicName; - private final int _numMaxRecordsToFetch; - private final ExecutorService _executorService; - private final ShardIteratorType _shardIteratorType; - private final int _rpsLimit; - public KinesisConsumer(KinesisConfig kinesisConfig) { - super(kinesisConfig); - _streamTopicName = kinesisConfig.getStreamTopicName(); - _numMaxRecordsToFetch = kinesisConfig.getNumMaxRecordsToFetch(); - _shardIteratorType = kinesisConfig.getShardIteratorType(); - _rpsLimit = kinesisConfig.getRpsLimit(); - _executorService = Executors.newSingleThreadExecutor(); + private int _currentSecond = 0; + private int _numRequestsInCurrentSecond = 0; + + public KinesisConsumer(KinesisConfig config) { + super(config); + LOGGER.info("Created Kinesis consumer with topic: {}, RPS limit: {}, max records per fetch: {}", + config.getStreamTopicName(), config.getRpsLimit(), config.getNumMaxRecordsToFetch()); } @VisibleForTesting - public KinesisConsumer(KinesisConfig kinesisConfig, KinesisClient kinesisClient) { - super(kinesisConfig, kinesisClient); - _kinesisClient = kinesisClient; - _streamTopicName = kinesisConfig.getStreamTopicName(); - _numMaxRecordsToFetch = kinesisConfig.getNumMaxRecordsToFetch(); - _shardIteratorType = kinesisConfig.getShardIteratorType(); - _rpsLimit = kinesisConfig.getRpsLimit(); - _executorService = Executors.newSingleThreadExecutor(); + public KinesisConsumer(KinesisConfig config, KinesisClient kinesisClient) { + super(config, kinesisClient); } /** * Fetch records from the Kinesis stream between the start and end KinesisCheckpoint */ @Override - public KinesisRecordsBatch fetchMessages(StreamPartitionMsgOffset startCheckpoint, - StreamPartitionMsgOffset endCheckpoint, int timeoutMs) { - List<KinesisStreamMessage> recordList = new ArrayList<>(); - Future<KinesisRecordsBatch> kinesisFetchResultFuture = - _executorService.submit(() -> getResult(startCheckpoint, endCheckpoint, recordList)); - - try { - return kinesisFetchResultFuture.get(timeoutMs, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - kinesisFetchResultFuture.cancel(true); - return handleException((KinesisPartitionGroupOffset) startCheckpoint, recordList); - } catch (Exception e) { - return handleException((KinesisPartitionGroupOffset) startCheckpoint, recordList); - } - } - - private KinesisRecordsBatch getResult(StreamPartitionMsgOffset startOffset, StreamPartitionMsgOffset endOffset, - List<KinesisStreamMessage> recordList) { - KinesisPartitionGroupOffset kinesisStartCheckpoint = (KinesisPartitionGroupOffset) startOffset; - - try { - if (_kinesisClient == null) { - createConnection(); - } - - // TODO: iterate upon all the shardIds in the map - // Okay for now, since we have assumed that every partition group contains a single shard - Map<String, String> startShardToSequenceMap = kinesisStartCheckpoint.getShardToStartSequenceMap(); - Preconditions.checkState(startShardToSequenceMap.size() == 1, - "Only 1 shard per consumer supported. Found: %s, in startShardToSequenceMap", - startShardToSequenceMap.keySet()); - Map.Entry<String, String> startShardToSequenceNum = startShardToSequenceMap.entrySet().iterator().next(); - String shardIterator = getShardIterator(startShardToSequenceNum.getKey(), startShardToSequenceNum.getValue()); - - String kinesisEndSequenceNumber = null; - - if (endOffset != null) { - KinesisPartitionGroupOffset kinesisEndCheckpoint = (KinesisPartitionGroupOffset) endOffset; - Map<String, String> endShardToSequenceMap = kinesisEndCheckpoint.getShardToStartSequenceMap(); - Preconditions.checkState(endShardToSequenceMap.size() == 1, - "Only 1 shard per consumer supported. Found: %s, in endShardToSequenceMap", endShardToSequenceMap.keySet()); - kinesisEndSequenceNumber = endShardToSequenceMap.values().iterator().next(); - } - - String nextStartSequenceNumber; - boolean isEndOfShard = false; - long currentWindow = System.currentTimeMillis() / SLEEP_TIME_BETWEEN_REQUESTS; - int currentWindowRequests = 0; - while (shardIterator != null) { - GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build(); - - long requestSentTime = System.currentTimeMillis() / 1000; - GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest); - - if (!getRecordsResponse.records().isEmpty()) { - getRecordsResponse.records().forEach(record -> { - recordList.add( - new KinesisStreamMessage(record.partitionKey().getBytes(StandardCharsets.UTF_8), - record.data().asByteArray(), record.sequenceNumber(), - (KinesisStreamMessageMetadata) _kinesisMetadataExtractor.extract(record), - record.data().asByteArray().length)); - }); - nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); - - if (kinesisEndSequenceNumber != null && kinesisEndSequenceNumber.compareTo(nextStartSequenceNumber) <= 0) { - break; - } - - if (recordList.size() >= _numMaxRecordsToFetch) { - break; - } - } - - if (getRecordsResponse.hasChildShards() && !getRecordsResponse.childShards().isEmpty()) { - //This statement returns true only when end of current shard has reached. - // hasChildShards only checks if the childShard is null and is a valid instance. - isEndOfShard = true; - break; - } - - shardIterator = getRecordsResponse.nextShardIterator(); - - if (Thread.interrupted()) { - break; - } - - // Kinesis enforces a limit of 5 .getRecords request per second on each shard from AWS end - // Beyond this limit we start getting ProvisionedThroughputExceededException which affect the ingestion - if (requestSentTime == currentWindow) { - currentWindowRequests++; - } else if (requestSentTime > currentWindow) { - currentWindow = requestSentTime; - currentWindowRequests = 0; - } - - if (currentWindowRequests >= _rpsLimit) { - try { - Thread.sleep(SLEEP_TIME_BETWEEN_REQUESTS); - } catch (InterruptedException e) { - LOGGER.debug("Sleep interrupted while rate limiting Kinesis requests", e); - break; - } + public KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, int timeoutMs) { Review Comment: Good question, and we cannot. The `timeoutMs` is for request, and since Kinesis consume request doesn't take a timeout, we should assume the request can fail when something went wrong. IMO using a separate executor just to enforce a timeout is bad design, and can cause other multi-thread issues (e.g. we need to use a concurrent list to store the messages). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org