Kezhu Wang created FLINK-21522:
----------------------------------
Summary: Iterative stream could not work with stop-with-savepoint
Key: FLINK-21522
URL: https://issues.apache.org/jira/browse/FLINK-21522
Project: Flink
Issue Type: Bug
Components: Runtime / Task
Affects Versions: 1.12.1, 1.11.3, 1.13.0
Reporter: Kezhu Wang
User reports this in user mail list:
[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stateful-functions-2-2-and-stop-with-savepoint-td41772.html]
I copied the full mail body here:
{quote}
I have an embedded function with a SinkFunction as an egress, implemented as
this pseudo-code:
val serializationSchema = KafkaSchemaSerializationSchema(... props required to
use a Confluent Schema Registry with Avro, auth etc ...)
return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema,
props, AT_LEAST_ONCE))
Checkpointing and taking a savepoint without stopping work as expected.
However, when I run "flink stop <job-id>" or even "flink stop --drain
<job-id>", the operation never completes, reporting IN_PROGRESS until I hit the
"failure-cause: org.apache.flink.runtime.checkpoint.CheckpointException:
Checkpoint expired before completing" CompletedException.
In the "Checkpoint History" it shows only 2 of my 3 operators completed their
work:
Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%) |
end-to-end duration: 638ms | data-size 1.38 KB
feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0% |
end-to-end duration: n/a | data-size: n/a
feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms | data-size: 0 B
I've been unable to gain any insights from logs so far. Thoughts?
{quote}
I think it is what we overlooked in evaluation ofÂ
[apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033],
FLINK-21132 and FLINK-21133.
I think the problem is two folds in current implementation:
# {{StreamIterationHead}} does not finish itself.
# There is a local feedback from {{StreamIterationTail}} to
{{StreamIterationHead}} which could cause {{StreamIterationTail}} blocking
after {{StreamIterationHead}} finished . Globally speaking, it is a loop.
[~pnowojski] emphasized this in FLINK-21132 and FLINK-21133.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)