AHeise commented on code in PR #70: URL: https://github.com/apache/flink-connector-kafka/pull/70#discussion_r1671715837
########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java: ########## @@ -72,12 +74,17 @@ private static Properties withTransactionalId( return props; } + public long getPendingRecordsCount() { + return pendingRecords.get(); + } + @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { if (inTransaction) { hasRecordsInTransaction = true; } - return super.send(record, callback); + pendingRecords.incrementAndGet(); + return super.send(record, new TrackingCallback(callback)); Review Comment: I can't quite follow. I was proposing to use `return super.send(record, callbackCache.computeIfAbsent(callback, TrackingCallback::new));` So we have 3 cases: - New callback, wrap in `TackingCallback` and cache. - Existing callback (common case), retrieve existing callback and use it. - Remove existing `TackingCallback` from cache if full. In all cases, both the TackingCallback and the original callback will be invoked. The only difference to the code without cache is that we avoiding creating extra TrackingCallback instances around the same original callback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org