Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4915#discussion_r147675169 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java --- @@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() { private void flushNewPartitions() { LOG.info("Flushing new partitions"); + enqueueNewPartitions().await(); + } + + private TransactionalRequestResult enqueueNewPartitions() { Object transactionManager = getValue(kafkaProducer, "transactionManager"); - Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler"); - invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler}); - TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result"); - Object sender = getValue(kafkaProducer, "sender"); - invoke(sender, "wakeup"); - result.await(); + synchronized (transactionManager) { + Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler"); --- End diff -- I implemented it, however in that case it would be much harder to test reliably `enqueueRequest(addPartitionsToTransactionHandler());` since in vast majority of cases this would be a dead code :/
---