Re: RemoteStorageManager.fetchLogSegment - how to deal with InterruptedException?
Hi, Is there a possibility to keep fetching segments instead of cancelling the underlying request? The reason I'm asking is that with the current implementation (I'm using aiven s3 plugin) there is a possibility that consumer never make any progress if the fetch.max.wait.ms is set too low and they keep issuing fetch requests that are never succeeding. At least having observability on this with a metric would make debugging easier. F. On Fri, Nov 17, 2023 at 2:17 PM Kamal Chandraprakash < kamal.chandraprak...@gmail.com> wrote: > Hi Ivan, > > I've opened a relevant patch to increase the timeout to fetch data from > remote storage. This will reduce the error occurrence rate: > > https://github.com/apache/kafka/pull/14778 > > I think it's better to distinguish the cases: > > 1) Any error that happens while reading the remote data -- this should be > logged at ERROR level and the failedRemoteReadRequestRate metric should > reflect it. > 2) When the task is cancelled due to timeout, then we can log it in the > debug level and skip updating the metrics. > > On Fri, Nov 17, 2023 at 6:11 PM Ivan Yurchenko wrote: > > > Hello! > > > > `RemoteStorageManager.fetchLogSegment` is called in a background thread > by > > the broker [1]. When a fetch request times out, the associated Future is > > cancelled [2] and the thread is interrupted. If the InterruptedException > is > > propagated from the `RemoteStorageManager`, it pollutes the broker logs > > with not very useful exceptions and also metrics with false errors [3]. > It > > would be good to deal with this somehow. I'm see two options: > > > > 1. Add a `catch` to handle `InterruptedException` here [4] in a less > noisy > > way. Maybe log on the debug level + not increase the error metrics. > > 2. Catch `InterruptedException` in the `RemoteStorageManager` > > implementation and return an empty `InputStream`. It seems safe because > it > > goes this way [5]. In this case, this needs to be documented in the > Javadoc. > > > > Personally, the first seems a better way. What does the community think > > about this? Regardless of the decision, I volunteer to make a PR (and a > KIP > > if needed). > > Thank you! > > > > Best, > > Ivan > > > > [1] > > > https://github.com/apache/kafka/blob/61fb83e20280ed3ac83de8290dd9815bdb7efcea/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1457-L1459 > > [2] > > > https://github.com/apache/kafka/blob/6f197301646135e0bb39a461ca0a07c09c3185fb/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L79-L83 > > [3] > > > https://github.com/apache/kafka/blob/8aaf7daff393f2b26438fb7fe28016e06e23558c/core/src/main/java/kafka/log/remote/RemoteLogReader.java#L68-L72 > > [4] > > > https://github.com/apache/kafka/blob/8aaf7daff393f2b26438fb7fe28016e06e23558c/core/src/main/java/kafka/log/remote/RemoteLogReader.java#L68 > > [5] > > > https://github.com/apache/kafka/blob/61fb83e20280ed3ac83de8290dd9815bdb7efcea/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1276-L1278 > > > > >
Re: RemoteStorageManager.fetchLogSegment - how to deal with InterruptedException?
Hi Ivan, I've opened a relevant patch to increase the timeout to fetch data from remote storage. This will reduce the error occurrence rate: https://github.com/apache/kafka/pull/14778 I think it's better to distinguish the cases: 1) Any error that happens while reading the remote data -- this should be logged at ERROR level and the failedRemoteReadRequestRate metric should reflect it. 2) When the task is cancelled due to timeout, then we can log it in the debug level and skip updating the metrics. On Fri, Nov 17, 2023 at 6:11 PM Ivan Yurchenko wrote: > Hello! > > `RemoteStorageManager.fetchLogSegment` is called in a background thread by > the broker [1]. When a fetch request times out, the associated Future is > cancelled [2] and the thread is interrupted. If the InterruptedException is > propagated from the `RemoteStorageManager`, it pollutes the broker logs > with not very useful exceptions and also metrics with false errors [3]. It > would be good to deal with this somehow. I'm see two options: > > 1. Add a `catch` to handle `InterruptedException` here [4] in a less noisy > way. Maybe log on the debug level + not increase the error metrics. > 2. Catch `InterruptedException` in the `RemoteStorageManager` > implementation and return an empty `InputStream`. It seems safe because it > goes this way [5]. In this case, this needs to be documented in the Javadoc. > > Personally, the first seems a better way. What does the community think > about this? Regardless of the decision, I volunteer to make a PR (and a KIP > if needed). > Thank you! > > Best, > Ivan > > [1] > https://github.com/apache/kafka/blob/61fb83e20280ed3ac83de8290dd9815bdb7efcea/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1457-L1459 > [2] > https://github.com/apache/kafka/blob/6f197301646135e0bb39a461ca0a07c09c3185fb/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L79-L83 > [3] > https://github.com/apache/kafka/blob/8aaf7daff393f2b26438fb7fe28016e06e23558c/core/src/main/java/kafka/log/remote/RemoteLogReader.java#L68-L72 > [4] > https://github.com/apache/kafka/blob/8aaf7daff393f2b26438fb7fe28016e06e23558c/core/src/main/java/kafka/log/remote/RemoteLogReader.java#L68 > [5] > https://github.com/apache/kafka/blob/61fb83e20280ed3ac83de8290dd9815bdb7efcea/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1276-L1278 > >
RemoteStorageManager.fetchLogSegment - how to deal with InterruptedException?
Hello! `RemoteStorageManager.fetchLogSegment` is called in a background thread by the broker [1]. When a fetch request times out, the associated Future is cancelled [2] and the thread is interrupted. If the InterruptedException is propagated from the `RemoteStorageManager`, it pollutes the broker logs with not very useful exceptions and also metrics with false errors [3]. It would be good to deal with this somehow. I'm see two options: 1. Add a `catch` to handle `InterruptedException` here [4] in a less noisy way. Maybe log on the debug level + not increase the error metrics. 2. Catch `InterruptedException` in the `RemoteStorageManager` implementation and return an empty `InputStream`. It seems safe because it goes this way [5]. In this case, this needs to be documented in the Javadoc. Personally, the first seems a better way. What does the community think about this? Regardless of the decision, I volunteer to make a PR (and a KIP if needed). Thank you! Best, Ivan [1] https://github.com/apache/kafka/blob/61fb83e20280ed3ac83de8290dd9815bdb7efcea/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1457-L1459 [2] https://github.com/apache/kafka/blob/6f197301646135e0bb39a461ca0a07c09c3185fb/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L79-L83 [3] https://github.com/apache/kafka/blob/8aaf7daff393f2b26438fb7fe28016e06e23558c/core/src/main/java/kafka/log/remote/RemoteLogReader.java#L68-L72 [4] https://github.com/apache/kafka/blob/8aaf7daff393f2b26438fb7fe28016e06e23558c/core/src/main/java/kafka/log/remote/RemoteLogReader.java#L68 [5] https://github.com/apache/kafka/blob/61fb83e20280ed3ac83de8290dd9815bdb7efcea/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1276-L1278