[
https://issues.apache.org/jira/browse/FLINK-21057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-21057:
-----------------------------------
Labels: auto-deprioritized-major stale-minor (was:
auto-deprioritized-major)
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issues has been marked as
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is
still Minor, please either assign yourself or give an update. Afterwards,
please remove the label or in 7 days the issue will be deprioritized.
> Streaming checkpointing with small interval leads app to hang
> -------------------------------------------------------------
>
> Key: FLINK-21057
> URL: https://issues.apache.org/jira/browse/FLINK-21057
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.11.3
> Environment: * streaming app
> * flink cluster in standalone-job / application mode
> * 1.11.3 Flink version
> * jobmanager --> 1 instance
> * taskmanager --> 1 instance
> * parallelism --> 2
> Reporter: Nazar Volynets
> Priority: Minor
> Labels: auto-deprioritized-major, stale-minor
> Attachments: jobmanager.log, taskmanager.log
>
>
> There is a simple streaming app with enabled checkpointing:
> * statebackend --> RockDB
> * mode --> EXACTLY_ONCE
> STRs:
> 1. Run Flink cluster in standalone-job / application mode (with embedded
> streaming app)
> 2. Get error
> 3. Wait 1 min
> 4. Stop Flink cluster
> 4. Repeat steps from 1 to 3 util error :
> {code:java|title=taskmanager}
> org.apache.kafka.common.KafkaException: Unexpected error in
> InitProducerIdResponse; Producer attempted an operation with an old epoch.
> Either there is a newer producer with the same transactionalId, or the
> producer's transaction has been expired by the broker.
> flink-kafka-mirror-maker-jobmanager | at
> org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
> ~[?:?]
> flink-kafka-mirror-maker-jobmanager | at
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
> ~[?:?]
> flink-kafka-mirror-maker-jobmanager | at
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
> ~[?:?]
> flink-kafka-mirror-maker-jobmanager | at
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
> ~[?:?]
> flink-kafka-mirror-maker-jobmanager | at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564) ~[?:?]
> flink-kafka-mirror-maker-jobmanager | at
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
> ~[?:?]
> flink-kafka-mirror-maker-jobmanager | at
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
> ~[?:?]
> flink-kafka-mirror-maker-jobmanager | at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) ~[?:?]
> flink-kafka-mirror-maker-jobmanager | at java.lang.Thread.run(Unknown
> Source) ~[?:?]
> {code}
> It is obvious
> Please find below:
> * streaming app code base (example)
> * attached logs
> ** jobmanager
> ** taskmanager
> *Example*
> +App+
> {code:java|title=build.gradle (dependencies)}
> ...
> ext {
> ...
> javaVersion = '11'
> flinkVersion = '1.12.0'
> scalaBinaryVersion = '2.11'
> ...
> }
> dependencies {
> ...
> implementation
> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
> implementation
> "org.apache.flink:flink-clients_${scalaBinaryVersion}:${flinkVersion}"
> implementation
> "org.apache.flink:flink-statebackend-rocksdb_${scalaBinaryVersion}:${flinkVersion}"
> ...
> }
> {code}
> {code:java|title=App}
> public static void main(String[] args) {
> ...
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(2);
> env.enableCheckpointing(500);
> env.setStateBackend(new
> RocksDBStateBackend("file:///xxx/config/checkpoints/rocksdb", true));
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
> env.getCheckpointConfig().setCheckpointTimeout(600000);
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> FlinkKafkaConsumer<Record> consumer = createConsumer();
> FlinkKafkaProducer<Record> producer = createProducer();
> env
> .addSource(consumer)
> .uid("kafka-consumer")
> .addSink(producer)
> .uid("kafka-producer")
> ;
> env.execute();
> }
> public static FlinkKafkaConsumer<Record> createConsumer() {
> ...
> Properties props = new Properties();
> props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "kafka-source-1:9091");
> ... // nothing special
> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
> FlinkKafkaConsumer<Record> consumer = new FlinkKafkaConsumer<>("topic-1",
> new RecordKafkaDerSchema(), props);
> ... // RecordKafkaDerSchema --> custom schema is used to copy not only
> message body but message key too
> ... // SimpleStringSchema --> can be used instead to reproduce issue
> consumer.setStartFromGroupOffsets();
> consumer.setCommitOffsetsOnCheckpoints(true);
> return consumer;
> }
> public static FlinkKafkaProducer<Record> createProducer() {
> ...
> Properties props = new Properties();
> ...
> props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "kafka-target-1:9094");
> props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
> "1");
> props.setProperty(ProducerConfig.ACKS_CONFIG, "all");
> props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
> props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000");
> props.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "2000");
> props.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "9000");
> props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
> props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "xxx"); //
> ignored due to expected behaviour -
> https://issues.apache.org/jira/browse/FLINK-17691
> props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "" + (15 * 60
> * 1000)); // decreased from 1 hour to 15 mins; app is going to be restarted
> quickly
> ...
> FlinkKafkaProducer<Record> producer = new FlinkKafkaProducer<>("topic-1",
> new RecordKafkaSerSchema(true), props,
> FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
> ... // RecordKafkaSerSchema --> custom schema is used to copy not only
> message body but message key too
> ... // SimpleStringSchema --> can be used instead to reproduce issue
> return producer;
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)