Rafał Trójczak created FLINK-34414:
--------------------------------------
Summary: EXACTLY_ONCE guarantee doesn't work properly for
Flink/Pulsar connector
Key: FLINK-34414
URL: https://issues.apache.org/jira/browse/FLINK-34414
Project: Flink
Issue Type: Bug
Components: Connectors / Pulsar
Affects Versions: 1.17.2
Reporter: Rafał Trójczak
Using Pulsar connector for Flink (version 4.1.0-1.17) with Flink job (version
1.17.2) when there is an exception thrown within the job, the job gets
restarted, starts from the last checkpoint, but the sink writes to the output
more events than it should, even though the EXACT_ONCE guarantees are set
everywhere. To be more specific, here is my Job's flow:
* a Pulsar source that reads from the input topic,
* a simple processing function,
* and a sink that writes to the output topic.
Here is a fragment of the source creation:
{code:java}
.setDeserializationSchema(Schema.AVRO(inClass), inClass)
.setSubscriptionName(subscription)
.enableSchemaEvolution()
.setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true)
.setConfig(PulsarSourceOptions.PULSAR_ACK_RECEIPT_ENABLED, true)
.setConfig(PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS, 1)
.setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE,
false);
{code}
Here is the fragment of the sink creation:
{code:java}
.setSerializationSchema(Schema.AVRO(outClass), outClass)
.setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true)
.setConfig(PulsarSinkOptions.PULSAR_WRITE_DELIVERY_GUARANTEE,
DeliveryGuarantee.EXACTLY_ONCE)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE);
{code}
And here is the Flink environment preparation:
{code:java}
environment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
environment.enableCheckpointing(CHECKPOINTING_INTERVAL,
CheckpointingMode.EXACTLY_ONCE);
{code}
After sending 1000 events on the input topic, on the output topic I got 1048
events.
I ran the job on my local Kubernetes cluster, using Kubernetes Flink Operator.
Here is the MRE for this problem (mind that there is an internal dependency,
but it may be commented out together with the code that relies on it):
[https://github.com/trojczak/flink-pulsar-connector-problem]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)