AHeise commented on code in PR #159:
URL: 
https://github.com/apache/flink-connector-kafka/pull/159#discussion_r2021015745


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java:
##########
@@ -131,26 +131,33 @@ public void recycleByTransactionId(String 
transactionalId) {
         if (!earlierTransactions.isEmpty()) {
             for (String id : earlierTransactions.values()) {
                 ProducerEntry entry = producerByTransactionalId.remove(id);
-                recycleProducer(entry.getProducer());
+                recycleProducer(entry.getProducer(), false);
             }
             earlierTransactions.clear();
         }
     }
 
     @Override
     public void recycle(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
-        recycleProducer(producer);
+        recycleProducer(producer, true);
         ProducerEntry producerEntry =
                 
producerByTransactionalId.remove(producer.getTransactionalId());
         
transactionalIdsByCheckpoint.remove(producerEntry.getCheckpointedTransaction());
     }
 
-    private void recycleProducer(@Nullable FlinkKafkaInternalProducer<byte[], 
byte[]> producer) {
+    private void recycleProducer(

Review Comment:
   It's a good idea but it makes the call-site messier. There are a total of 4 
cases now:
   * Producer is recycled in writer (abort loop). That would go well with your 
proposal.
   * Committer regularly commits and recycles. That would also go well with 
your proposal.
   * Special case 1: on recovery, committer commits and recycles the id, in 
that case the producer entry doesn't contain an internal producer and is null. 
So we would need to handle the null on call-site.
   * Special case 2: on non-chained committer, we may end up with having more 
transactions in the state than there are running (see the comment in 
`recycleByTransactionId`). Here, we also may have null values. 
   
   So in cases where the producer is null or reuse is false, it gets messier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to