Herbert Wang created FLINK-37663:
------------------------------------
Summary: Thread Synchronization Issue in
FutureCompletingBlockingQueue <> Connector Split Fetching
Key: FLINK-37663
URL: https://issues.apache.org/jira/browse/FLINK-37663
Project: Flink
Issue Type: Bug
Components: API / Core
Affects Versions: 1.18.1
Reporter: Herbert Wang
# Potential Thread Synchronization Issue in FutureCompletingBlockingQueue and
Connector Split Fetching (Flink 1.18)
**Context:** We are investigating potential sources of unresponsiveness or
deadlocks related to connector split fetching in Flink 1.18, and our analysis
points towards a possible synchronization issue within the
`FutureCompletingBlockingQueue`. We are not certain this is the definitive
cause, but we wanted to share our findings and reasoning for expert review.
## Suspected Problem Description
We suspect a potential thread synchronization issue exists in the
`FutureCompletingBlockingQueue`'s interaction with split fetchers, specifically
concerning the `wakeUpPuttingThread()` method. Our concern is that when this
method manually wakes up a putting thread, it signals the associated condition
variable but might not remove it from the internal `notFull` waiting queue.
This could potentially lead to an inconsistent state where subsequent
`signalNextPutter()` calls might signal an already-awoken or non-waiting
thread's condition, effectively causing a "lost signal" for a genuinely waiting
thread.
## Hypothesized Scenario Illustrating the Concern
Consider the following sequence of events based on our understanding of the
code:
1. **Initial State**: The element queue (size 1) is full. A fetcher (Fetcher
A) has completed its task, fetched records, and is blocked attempting to
`put()` elements into the full queue.
* Fetcher A's thread calls `waitOnPut()` and goes to sleep.
* Its condition variable is added to the `notFull` queue.
2. **External Wake-up Trigger**: The `SplitFetcherManager` (or similar
component) calls `SplitFetcher.addSplits()`, which eventually leads to a task
being enqueued and a wake-up signal.
* Path: `addSplits` -> `enqueueTaskUnsafe` -> `wakeUpUnsafe` ->
`currentTask.wakeUp()` (assuming Fetcher A was executing `currentTask`).
3. **Fetcher Wake-up Path**:
* `FetchTask.wakeUp()` sets an internal `wakeup` flag to `true`.
* It then calls `elementsQueue.wakeUpPuttingThread(fetcherIndex)` for
Fetcher A.
4. **Potential Inconsistency Point**: Inside
`wakeUpPuttingThread(fetcherIndex)`:
* The corresponding `fetcherIndex`'s wake-up flag is set to `true`.
* The associated condition variable (Fetcher A's) is signaled.
* Fetcher A's thread wakes up from `waitOnPut()`, likely checks its
wake-up flag, and returns `false` from `put()`.
* **Key Concern**: Our analysis suggests that the condition variable for
Fetcher A might *remain* in the `notFull` queue at this point, as
`wakeUpPuttingThread` doesn't appear to remove it.
5. **Fetcher State Change**: Fetcher A, having returned from `put()` (possibly
due to the wake-up), might subsequently be closed or enter an idle state.
6. **New Fetcher Blocks**: A different fetcher (Fetcher B) becomes active,
fetches data, and attempts to `put()` elements into the queue, which is still
full.
* Fetcher B calls `waitOnPut()`.
* Its condition variable is added to the `notFull` queue (potentially
*after* Fetcher A's condition, if it remained).
* Fetcher B's thread goes to sleep.
7. **Consumer Action**: The source reader thread consumes an element from the
queue via `poll()`.
* Path: `getNextFetch()` -> `elementsQueue.poll()` -> `dequeue()`
8. **Signaling Attempt**: Since the queue was full and is now not full,
`dequeue()` calls `signalNextPutter()`.
* Path: `dequeue()` -> `signalNextPutter()` -> `notFull.poll().signal()`
9. **Potential Lost Signal**: **If** Fetcher A's condition remained in the
`notFull` queue (as suspected in step 4) and is at the head of the queue,
`notFull.poll()` will retrieve and signal Fetcher A's condition variable.
* This signal might be effectively lost because Fetcher A is no longer
waiting on that condition (it was woken up manually or might even be closed).
* Fetcher B, which *is* genuinely waiting for space, remains asleep
because its condition variable was not polled and signaled.
* This could lead to Fetcher B (and potentially others) never being woken
up, resulting in stalled data fetching or apparent deadlocks.
## Suggested Area for Investigation / Potential Fix
Based on this hypothesis, the potential inconsistency seems to stem from
`wakeUpPuttingThread` not removing the condition from the `notFull` queue. If
this analysis is correct, a possible solution could involve ensuring the
condition is removed when a thread is woken up manually via this path. For
example, adding a line similar to:
```java
// Inside wakeUpPuttingThread, after retrieving caf:
notFull.remove(caf.condition()); // Conceptual - requires correct
implementation details
--
This message was sent by Atlassian Jira
(v8.20.10#820010)