I had a Flink 1.15.1 job configured with ``` execution.checkpointing.mode=`EXACTLY_ONCE` ```
that was failing with the following error ``` Sink: Committer (2/2)#732 (36640a337c6ccdc733d176b18adab979) switched from INITIALIZING to FAILED with failure cause: java.lang.IllegalStateException: Failed to commit KafkaCommittable{producerId=4521984, epoch=0, transactionalId=} ... Caused by: org.apache.kafka.common.config.ConfigException: Invalid value for configuration transactional.id: String must be non-empty ``` that happened after the first checkpoint was triggered. The strange thing about it is that the `KafkaSinkBuilder` was used without calling `setDeliverGuarantee`, and hence the default delivery guarantee was expected to be used, which is `NONE` [1]. Is that even possible to start with? Shouldn't kafka transactions be involved only when one follows [this recipe] [2]: * <p>One can also configure different {@link DeliveryGuarantee} by using {@link * #setDeliverGuarantee(DeliveryGuarantee)} but keep in mind when using {@link * DeliveryGuarantee#EXACTLY_ONCE} one must set the transactionalIdPrefix {@link * #setTransactionalIdPrefix(String)}. So, in my case, without calling `setDeliverGuarantee` (nor `setTransactionalIdPrefix`), I cannot understand why I was seeing these errors. To avoid the problem, I temporarily changed the checkpointing settings to ``` execution.checkpointing.mode=`AT_LEAST_ONCE` ``` but I'd like to understand what was happening. [1]: https://github.com/apache/flink/blob/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L66 [2]: https://github.com/apache/flink/blob/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L51 FYI I've also posted this in SO here: - https://stackoverflow.com/questions/74342971/transactional-id-errors-when-using-kafka-sink-with-exactly-once-checkpoints