[jira] [Created] (FLINK-21056) Streaming checkpointing is failing occasionally

2021-01-20 Thread Nazar Volynets (Jira)
Nazar Volynets created FLINK-21056:
--

 Summary: Streaming checkpointing is failing occasionally
 Key: FLINK-21056
 URL: https://issues.apache.org/jira/browse/FLINK-21056
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.11.3
 Environment: * streaming app
* flink cluster in standalone-job / application mode
* 1.11.3 Flink version
* jobmanager --> 1 instance
* taskmanager --> 1 instance
* parallelism --> 2
Reporter: Nazar Volynets
 Attachments: jobmanager.log, taskmanager.log

There is a simple streaming app with enabled checkpointing:
 * statebackend --> RockDB
 * mode --> EXACTLY_ONCE

STRs:
 1. Run Flink cluster in standalone-job / application mode (with embedded 
streaming app)
 2. Wait 10 minutes
 3. Restart Flink cluster (& consequently streaming app)
 4. Repeat steps from #1 to #3 until you will get an checkpointing error
{code:java|title=taskmanager}
2021-01-19 12:09:39,719 INFO  
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl [] - 
Could not complete snapshot 21 for operator Source: Custom Source -> Sink: 
Unnamed (1/2). Failure reason: Checkpoint was declined.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
snapshot 21 for operator Source: Custom Source -> Sink: Unnamed (1/2). Failure 
reason: Checkpoint was declined.
...
Caused by: org.apache.flink.util.SerializedThrowable: Timeout expired after 
6milliseconds while awaiting InitProducerId
{code}
Based on stack trace quite tricky to define / determine the root cause.

Please find below:
 * streaming app code base (example)
 * attached logs
 ** jobmanager
 ** taskmanager

*Example*

+App+
{code:java|title=build.gradle (dependencies)}
...
ext {
  ...
  javaVersion = '11'
  flinkVersion = '1.12.0'
  scalaBinaryVersion = '2.11'
  ...
}

dependencies {
  ...
  implementation 
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
  implementation 
"org.apache.flink:flink-clients_${scalaBinaryVersion}:${flinkVersion}"
  implementation 
"org.apache.flink:flink-statebackend-rocksdb_${scalaBinaryVersion}:${flinkVersion}"
  ...
}
{code}
{code:java|title=App}
public static void main(String[] args) {
  ...
  StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

  env.setParallelism(2);

  env.enableCheckpointing(1);

  env.setStateBackend(new 
RocksDBStateBackend("file:///xxx/config/checkpoints/rocksdb", true));

  
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
  env.getCheckpointConfig().setCheckpointTimeout(60);
  env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

  FlinkKafkaConsumer consumer = createConsumer();
  FlinkKafkaProducer producer = createProducer();

  env
.addSource(consumer)
.uid("kafka-consumer")
.addSink(producer)
.uid("kafka-producer")
  ;

  env.execute();
}

public static FlinkKafkaConsumer createConsumer() {
  ...
  Properties props = new Properties();
  props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"kafka-source-1:9091");
  ... // nothing special
  props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

  FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>("topic-1", new 
RecordKafkaDerSchema(), props);

  ... // RecordKafkaDerSchema --> custom schema is used to copy not only 
message body but message key too
  ... // SimpleStringSchema --> can be used instead to reproduce issue

  consumer.setStartFromGroupOffsets();
  consumer.setCommitOffsetsOnCheckpoints(true);

  return consumer;
}

public static FlinkKafkaProducer createProducer() {
  ...
  Properties props = new Properties();
  ...
  props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"kafka-target-1:9094");
  props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
  props.setProperty(ProducerConfig.ACKS_CONFIG, "all");
  props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
  props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000");
  props.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "2000");
  props.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "9000");
  props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
  props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "xxx"); // ignored 
due to expected behaviour - https://issues.apache.org/jira/browse/FLINK-17691
  props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "" + (15 * 60 * 
1000)); // decreased from 1 hour to 15 mins; app is going to be restarted 
quickly
  ...
  FlinkKafkaProducer producer = new FlinkKafkaProducer<>("topic-1", new 
RecordKafkaSerSchema(true), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

  ... // RecordKafkaSerSchema --> custom schema is used to copy not only

[jira] [Created] (FLINK-21057) Streaming checkpointing with small interval leads app to hang

2021-01-20 Thread Nazar Volynets (Jira)
Nazar Volynets created FLINK-21057:
--

 Summary: Streaming checkpointing with small interval leads app to 
hang
 Key: FLINK-21057
 URL: https://issues.apache.org/jira/browse/FLINK-21057
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.11.3
 Environment: * streaming app
* flink cluster in standalone-job / application mode
* 1.11.3 Flink version
* jobmanager --> 1 instance
* taskmanager --> 1 instance
* parallelism --> 2
Reporter: Nazar Volynets
 Attachments: jobmanager.log, taskmanager.log

There is a simple streaming app with enabled checkpointing:
 * statebackend --> RockDB
 * mode --> EXACTLY_ONCE

STRs:
 1. Run Flink cluster in standalone-job / application mode (with embedded 
streaming app)
 2. Get error
 3. Wait 1 min
 4. Stop Flink cluster
 4. Repeat steps from 1 to 3 util error :
{code:java|title=taskmanager}
org.apache.kafka.common.KafkaException: Unexpected error in 
InitProducerIdResponse; Producer attempted an operation with an old epoch. 
Either there is a newer producer with the same transactionalId, or the 
producer's transaction has been expired by the broker.
flink-kafka-mirror-maker-jobmanager |   at 
org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
 ~[?:?]
flink-kafka-mirror-maker-jobmanager |   at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
 ~[?:?]
flink-kafka-mirror-maker-jobmanager |   at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) 
~[?:?]
flink-kafka-mirror-maker-jobmanager |   at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
 ~[?:?]
flink-kafka-mirror-maker-jobmanager |   at 
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564) ~[?:?]
flink-kafka-mirror-maker-jobmanager |   at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
 ~[?:?]
flink-kafka-mirror-maker-jobmanager |   at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312) 
~[?:?]
flink-kafka-mirror-maker-jobmanager |   at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) ~[?:?]
flink-kafka-mirror-maker-jobmanager |   at java.lang.Thread.run(Unknown 
Source) ~[?:?]
{code}

It is obvious 

Please find below:
 * streaming app code base (example)
 * attached logs
 ** jobmanager
 ** taskmanager

*Example*

+App+
{code:java|title=build.gradle (dependencies)}
...
ext {
  ...
  javaVersion = '11'
  flinkVersion = '1.12.0'
  scalaBinaryVersion = '2.11'
  ...
}

dependencies {
  ...
  implementation 
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
  implementation 
"org.apache.flink:flink-clients_${scalaBinaryVersion}:${flinkVersion}"
  implementation 
"org.apache.flink:flink-statebackend-rocksdb_${scalaBinaryVersion}:${flinkVersion}"
  ...
}
{code}
{code:java|title=App}
public static void main(String[] args) {
  ...
  StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

  env.setParallelism(2);

  env.enableCheckpointing(500);

  env.setStateBackend(new 
RocksDBStateBackend("file:///xxx/config/checkpoints/rocksdb", true));

  
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
  env.getCheckpointConfig().setCheckpointTimeout(60);
  env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

  FlinkKafkaConsumer consumer = createConsumer();
  FlinkKafkaProducer producer = createProducer();

  env
.addSource(consumer)
.uid("kafka-consumer")
.addSink(producer)
.uid("kafka-producer")
  ;

  env.execute();
}

public static FlinkKafkaConsumer createConsumer() {
  ...
  Properties props = new Properties();
  props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"kafka-source-1:9091");
  ... // nothing special
  props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

  FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>("topic-1", new 
RecordKafkaDerSchema(), props);

  ... // RecordKafkaDerSchema --> custom schema is used to copy not only 
message body but message key too
  ... // SimpleStringSchema --> can be used instead to reproduce issue

  consumer.setStartFromGroupOffsets();
  consumer.setCommitOffsetsOnCheckpoints(true);

  return consumer;
}

public static FlinkKafkaProducer createProducer() {
  ...
  Properties props = new Properties();
  ...
  props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"kafka-target-1:9094");
  props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
  props.setProperty(ProducerConfig.ACKS_

[jira] [Created] (FLINK-20775) Missed Docker Images Flink 1.12

2020-12-27 Thread Nazar Volynets (Jira)
Nazar Volynets created FLINK-20775:
--

 Summary: Missed Docker Images Flink 1.12
 Key: FLINK-20775
 URL: https://issues.apache.org/jira/browse/FLINK-20775
 Project: Flink
  Issue Type: Bug
  Components: Build System
Affects Versions: 1.12.0
Reporter: Nazar Volynets


Apache Flink 1.12 has been release but corresponding images have been not 
exposed into Flink's *official* Docker Hub repo:
[https://hub.docker.com/_/flink?tab=tags&page=1&ordering=last_updated&name=1.12]

Consequently, missed image(s) *blocks* to use Apache Flink 1.12 to spin up 
Flink in Standalone Per-Job mode within Kubernetes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20753) Duplicates With Exactly-once Kafka -> Kakfa Producer

2020-12-23 Thread Nazar Volynets (Jira)
Nazar Volynets created FLINK-20753:
--

 Summary: Duplicates With Exactly-once Kafka -> Kakfa Producer
 Key: FLINK-20753
 URL: https://issues.apache.org/jira/browse/FLINK-20753
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka, Runtime / Checkpointing
Affects Versions: 1.12.0
 Environment: Java 11
Flink stated within IDE
Reporter: Nazar Volynets


*Introduction*

Based on as follows statements from Flink's docs:

1. 
[https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html]
{quote}Flink provides an [Apache Kafka|https://kafka.apache.org/] connector for 
reading data from and writing data to Kafka topics with exactly-once guarantees.
{quote}
2. 
[https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html#exactly-once-end-to-end]
{quote}To achieve exactly once end-to-end, so that every event from the sources 
affects the sinks exactly once, the following must be true:
 # your sources must be replayable, and
 # your sinks must be transactional (or idempotent){quote}
3. 
[https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#caveats]
{quote}{{Semantic.EXACTLY_ONCE}} mode relies on the ability to commit 
transactions that were started before taking a checkpoint, after recovering 
from the said checkpoint. If the time between Flink application crash and 
completed restart is larger than Kafka's transaction timeout there will be data 
loss (Kafka will automatically abort transactions that exceeded timeout time)
{quote}
4. [https://issues.apache.org/jira/browse/FLINK-7210]
There is references/mentions about two-phase commit mechanic used in old Flink 
Kafka connector. So it is expected that latest one version of connector has the 
same functionality.

it is indirectly expectation of EXACTLY_ONCE Kafka->Kafka end-to-end delivery 
guarantees.

Moreover it is emphasised to tune Kafka cluster transaction timeout (make it 
from 15 mins to 1 hour) to omit data loss.

Moving forward, all these three statements are met by `Kafka Source` -> `Kafka 
Sink` app:
 * regarding first-one -> you are reading from & to Kafka
 * about second-one -> `Kafka Source` is replayable & `Kafka Sink` is 
transactional
 * last one -> `Kafka Sink` is transactional & consequently in case of 
EXACTLY_ONCE this operator has a state; so it expected that transaction will be 
rolled back.

But in fact there is no possibility to achieve EXACTLY_ONCE for simple Flink 
`Kafka Source` -> `Kafka Sink` application. Duplicates still exists as result 
EXACTLY_ONCE semantics is violated.

*Details*

+STRs:+
 # Create simple Flink's `Kafka Source` -> `Kafka Sink` app
 ## Stream execution env:
 ### Parallelism -> 1
 ### Enable checkpointing -> 1 ms (do it so big intentionally)
 ### State backend -> RocksDB
 ### Checkpointing mode -> EXACTLY_ONCE
 ### Min pause between checkpoints -> 500 ms
 ### Max concurrent checkpoints -> 1
 ## Flink Kafka consumer
 ### Nothing valuable
 ## Flink Kafka producer
 ### Props:
  ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"
  ProducerConfig.ACKS_CONFIG, "all"
  ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"
 ### EXACTLY_ONCE Semantic
 # Deploy `Kafka Source` Cluster
 ## Cretae `topic-1` with 3 patitions
 # Deploy `Kafka Sink` Cluster
 ## Cretae `topic-1` with 3 patitions
 # Spin up some Kafka client to generate data into `Kafka Source`:`topic-1` 
(e.g. Confluent `kafka-console-producer`)
 # Spin up +transactional+ Kafka consumer to drain data from `Kafka 
Sink`:`topic-1` (e.g. Confluent `kafka-console-consumer`)
 # Use Flink's app described in step #1 to ship data from `Kafka Source` -> 
`Kafka Sink` Kafka cluster.
 # Wait until Flink app will create a first checkpoint. 
 # Brutally kill Flink's app (SIGKILL)
 # Wait 10 secs
 # Start Flink app again.
 # Check on duplications in +transactional+ Kafka consumer (described in step 
#5)

+Actual+

Duplication are exist in +transactional+ Kafka consumer output.

+Expected+
 * Kafka transaction should be rolled back by Flink Kafka producer with 
EXACTLY_ONCE Semantic
 * Flink should automatically replay the data from `Kafka Source` based on 
offsets persisted in latest checkpoint

*Example*

+App+
{code:java|title=build.gradle (dependencies)}
...
ext {
  ...
  javaVersion = '11'
  flinkVersion = '1.12.0'
  scalaBinaryVersion = '2.11'
  ...
}

dependencies {
  ...
  implementation 
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
  implementation 
"org.apache.flink:flink-clients_${scalaBinaryVersion}:${flinkVersion}"
  implementation 
"org.apache.flink:flink-statebackend-rocksdb_${scalaBinaryVersion}:${flinkVersion}"
  ...
}
{code}
{code:java|title=App}
public static void main(String[] args) {
  ...
  StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

  env.se