Rafał Trójczak created FLINK-34414:
--------------------------------------

             Summary: EXACTLY_ONCE guarantee doesn't work properly for 
Flink/Pulsar connector 
                 Key: FLINK-34414
                 URL: https://issues.apache.org/jira/browse/FLINK-34414
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Pulsar
    Affects Versions: 1.17.2
            Reporter: Rafał Trójczak


Using Pulsar connector for Flink (version 4.1.0-1.17) with Flink job (version 
1.17.2) when there is an exception thrown within the job, the job gets 
restarted, starts from the last checkpoint, but the sink writes to the output 
more events than it should, even though the EXACT_ONCE guarantees are set 
everywhere. To be more specific, here is my Job's flow:
 * a Pulsar source that reads from the input topic,
 * a simple processing function,
 * and a sink that writes to the output topic.

Here is a fragment of the source creation:
{code:java}
    .setDeserializationSchema(Schema.AVRO(inClass), inClass)
    .setSubscriptionName(subscription)
    .enableSchemaEvolution()
    .setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true)
    .setConfig(PulsarSourceOptions.PULSAR_ACK_RECEIPT_ENABLED, true)
    .setConfig(PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS, 1)
    .setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, 
false);
{code}
Here is the fragment of the sink creation:
{code:java}
    .setSerializationSchema(Schema.AVRO(outClass), outClass)
    .setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true)
    .setConfig(PulsarSinkOptions.PULSAR_WRITE_DELIVERY_GUARANTEE, 
DeliveryGuarantee.EXACTLY_ONCE)
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE);
{code}
And here is the Flink environment preparation:
{code:java}
    environment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
    environment.enableCheckpointing(CHECKPOINTING_INTERVAL, 
CheckpointingMode.EXACTLY_ONCE);
{code}
After sending 1000 events on the input topic, on the output topic I got 1048 
events.

I ran the job on my local Kubernetes cluster, using Kubernetes Flink Operator.

Here is the MRE for this problem (mind that there is an internal dependency, 
but it may be commented out together with the code that relies on it): 
[https://github.com/trojczak/flink-pulsar-connector-problem]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to