AHeise commented on code in PR #70:
URL: 
https://github.com/apache/flink-connector-kafka/pull/70#discussion_r1671715837


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java:
##########
@@ -72,12 +74,17 @@ private static Properties withTransactionalId(
         return props;
     }
 
+    public long getPendingRecordsCount() {
+        return pendingRecords.get();
+    }
+
     @Override
     public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback 
callback) {
         if (inTransaction) {
             hasRecordsInTransaction = true;
         }
-        return super.send(record, callback);
+        pendingRecords.incrementAndGet();
+        return super.send(record, new TrackingCallback(callback));

Review Comment:
   I can't quite follow. I was proposing to use
   
   `return super.send(record, callbackCache.computeIfAbsent(callback, 
TrackingCallback::new));`
   
   So we have 3 cases:
   - New callback, wrap in `TackingCallback` and cache.
   - Existing callback (common case), retrieve existing callback and use it.
   - Remove existing `TackingCallback` from cache if full.
   
   In all cases, both the TackingCallback and the original callback will be 
invoked. The only difference to the code without cache is that we avoiding 
creating extra TrackingCallback instances around the same original callback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to