[
https://issues.apache.org/jira/browse/FLINK-38600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18034317#comment-18034317
]
Victor Babenko commented on FLINK-38600:
----------------------------------------
PR: [https://github.com/apache/flink-connector-pulsar/pull/112]
> Pulsar connector fails with 'Exclusive consumer is already connected' due to
> a race condition
> ---------------------------------------------------------------------------------------------
>
> Key: FLINK-38600
> URL: https://issues.apache.org/jira/browse/FLINK-38600
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Pulsar
> Reporter: Victor Babenko
> Priority: Major
>
> There is a race condition issue in the Flink Pulsar Connector that
> occasionally happens (about 1-2% probability per partition in our setup)
> because the connector is [creating a dummy
> consumer|https://github.com/apache/flink-connector-pulsar/blob/08c8e2ea60e8ef01ff9a81a0b2bf8c1a132f5db4/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java#L85]
> to seek to the right cursor position, closes it and immediately after that
> [creates the real
> consumer|https://github.com/apache/flink-connector-pulsar/blob/08c8e2ea60e8ef01ff9a81a0b2bf8c1a132f5db4/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java#L239].
> It leads to a race condition where the previous consumer is not fully
> released on the broker side, and the broker responds with {{Exclusive
> consumer is already connected }}, which leads to the job being restarted. In
> our case we were subscribing to thousands of topics, so the job would
> continuously restart for hours until it reaches an attempt where none of the
> topics hit this race condition.
> I believe this may be a regression from
> [#59|https://github.com/apache/flink-connector-pulsar/pull/59]. The reason we
> have to create a separate consumer to seek is described in
> [PIP-194|https://github.com/apache/pulsar/issues/16757]. Basically it looks
> like there isn't a way to create a consumer with the cursor already set: if
> we create it and _then_ call {{{}seek{}}}, some messages may still leak
> through in between. Maybe StreamNative knows of another way, but it seems
> like PIP-194 is not adopted/implemented so we have to seek before creating
> the real consumer.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)