[ 
https://issues.apache.org/jira/browse/FLINK-38600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Babenko updated FLINK-38600:
-----------------------------------
    Affects Version/s: pulsar-4.1.0

> Pulsar connector fails with 'Exclusive consumer is already connected' due to 
> a race condition
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38600
>                 URL: https://issues.apache.org/jira/browse/FLINK-38600
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Pulsar
>    Affects Versions: pulsar-4.1.0
>            Reporter: Victor Babenko
>            Priority: Major
>
> There is a race condition issue in the Flink Pulsar Connector that 
> occasionally happens (about 1-2% probability per partition in our setup) 
> because the connector is [creating a dummy 
> consumer|https://github.com/apache/flink-connector-pulsar/blob/08c8e2ea60e8ef01ff9a81a0b2bf8c1a132f5db4/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java#L85]
>  to seek to the right cursor position, closes it and immediately after that 
> [creates the real 
> consumer|https://github.com/apache/flink-connector-pulsar/blob/08c8e2ea60e8ef01ff9a81a0b2bf8c1a132f5db4/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java#L239].
>  It leads to a race condition where the previous consumer is not fully 
> released on the broker side, and the broker responds with {{Exclusive 
> consumer is already connected }}, which leads to the job being restarted. In 
> our case we were subscribing to thousands of topics, so the job would 
> continuously restart for hours until it reaches an attempt where none of the 
> topics hit this race condition.
> I believe this may be a regression from 
> [#59|https://github.com/apache/flink-connector-pulsar/pull/59]. The reason we 
> have to create a separate consumer to seek is described in 
> [PIP-194|https://github.com/apache/pulsar/issues/16757]. Basically it looks 
> like there isn't a way to create a consumer with the cursor already set: if 
> we create it and _then_ call {{{}seek{}}}, some messages may still leak 
> through in between. Maybe StreamNative knows of another way, but it seems 
> like PIP-194 is not adopted/implemented so we have to seek before creating 
> the real consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to