Michał Misiewicz created FLINK-38096:
----------------------------------------
Summary: AsyncSinkWriter may hang when configured with a custom
rate limiting strategy
Key: FLINK-38096
URL: https://issues.apache.org/jira/browse/FLINK-38096
Project: Flink
Issue Type: Bug
Components: Connectors / Common
Affects Versions: 1.20.2, 1.20.1, 1.20.0, 2.0.0
Reporter: Michał Misiewicz
[AsyncSinkWriter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java]
may hang when using a custom rate-limiting strategy that may block new
requests when no others are in progress. This issue occurs when implementing
rate limits, such as restricting API requests per interval.
Given the following RateLimitingStrategy implementation based on
[bucket4j|https://github.com/bucket4j/bucket4j]:
{code:java}
package pl.zabka.cdp.common.ratelimit
import com.typesafe.scalalogging.LazyLogging
import io.github.bucket4j.Bucket
import org.apache.flink.connector.base.sink.writer.strategy.{
RateLimitingStrategy,
RequestInfo,
ResultInfo
}
import java.io.Serializable
class TokenBucketRateLimitingStrategy(
maxInFlightRequests: Int,
tokensPerSecond: Long,
tokensPerMinute: Long
) extends RateLimitingStrategy
with LazyLogging
with Serializable {
@transient
private var currentInFlightRequests = 0
@transient
private lazy val bucket: Bucket = TokenBucketProvider.getInstance(
"TokenBucketRateLimitingStrategy",
tokensPerSecond,
tokensPerMinute
)
override def shouldBlock(requestInfo: RequestInfo): Boolean = {
currentInFlightRequests >= maxInFlightRequests ||
areTokensNotAvailable(requestInfo)
}
private def areTokensNotAvailable(requestInfo: RequestInfo): Boolean = {
val batchSize = requestInfo.getBatchSize
if (batchSize <= 0) {
logger.debug(s"Received request with invalid batch size: $batchSize,
allowing to proceed")
return false
}
!bucket.estimateAbilityToConsume(batchSize).canBeConsumed
}
...
} {code}
AsyncSinkWriter may hang on
[mailboxExecutor.yield()|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L356]
when all tokens are already consumed and no requests are in flight:
{code:java}
private void flush() throws InterruptedException {
RequestInfo requestInfo = createRequestInfo();
while (rateLimitingStrategy.shouldBlock(requestInfo)) {
mailboxExecutor.yield();
requestInfo = createRequestInfo();
}
...
}{code}
We're observing Flink hanging during checkpointing, which leads to job failure
caused by checkpoint timeout.
Solution:
Ensure mailboxExecutor.yield() is only called when in-flight requests are
present. Replacing it with the private function
[yieldIfThereExistsInFlightRequests()|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L443]
provides a direct solution to this problem:
{code:java}
private void flush() throws InterruptedException {
RequestInfo requestInfo = createRequestInfo();
while (rateLimitingStrategy.shouldBlock(requestInfo)) {
yieldIfThereExistsInFlightRequests();
requestInfo = createRequestInfo();
}
...
} {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)