[hotfix][kafka] Move checkpointing enable checking to initializeState initializeState is called before open and since both of those functions relay on chosen semantic, that means checkpointing enable check should happen in initializeState.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/425ffe26 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/425ffe26 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/425ffe26 Branch: refs/heads/master Commit: 425ffe268f0c5aceac084b522af04736d2298da7 Parents: 856b6ba Author: Piotr Nowojski <[email protected]> Authored: Wed Oct 25 18:08:46 2017 +0200 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Thu Nov 2 12:43:20 2017 +0800 ---------------------------------------------------------------------- .../streaming/connectors/kafka/FlinkKafkaProducer011.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/425ffe26/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java index 6242a20..a69c730 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java @@ -524,11 +524,6 @@ public class FlinkKafkaProducer011<IN> */ @Override public void open(Configuration configuration) throws Exception { - if (semantic != Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) { - LOG.warn("Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.", semantic, Semantic.NONE); - semantic = Semantic.NONE; - } - if (logFailuresOnly) { callback = new Callback() { @Override @@ -787,6 +782,11 @@ public class FlinkKafkaProducer011<IN> @Override public void initializeState(FunctionInitializationContext context) throws Exception { + if (semantic != Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) { + LOG.warn("Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.", semantic, Semantic.NONE); + semantic = Semantic.NONE; + } + nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState( NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
