Re: RemoteStorageManager.fetchLogSegment - how to deal with InterruptedException?

2023-11-17 Thread Francois Visconte
Hi,

Is there a possibility to keep fetching segments instead of cancelling the
underlying request?
The reason I'm asking is that with the current implementation (I'm using
aiven s3 plugin) there is a possibility that
consumer never make any progress if the fetch.max.wait.ms is set too low
and they keep issuing fetch requests that
are never succeeding.
At least having observability on this with a metric would make debugging
easier.

F.

On Fri, Nov 17, 2023 at 2:17 PM Kamal Chandraprakash <
kamal.chandraprak...@gmail.com> wrote:

> Hi Ivan,
>
> I've opened a relevant patch to increase the timeout to fetch data from
> remote storage. This will reduce the error occurrence rate:
>
> https://github.com/apache/kafka/pull/14778
>
> I think it's better to distinguish the cases:
>
> 1) Any error that happens while reading the remote data -- this should be
> logged at ERROR level and the failedRemoteReadRequestRate metric should
> reflect it.
> 2) When the task is cancelled due to timeout, then we can log it in the
> debug level and skip updating the metrics.
>
> On Fri, Nov 17, 2023 at 6:11 PM Ivan Yurchenko  wrote:
>
> > Hello!
> >
> > `RemoteStorageManager.fetchLogSegment` is called in a background thread
> by
> > the broker [1]. When a fetch request times out, the associated Future is
> > cancelled [2] and the thread is interrupted. If the InterruptedException
> is
> > propagated from the `RemoteStorageManager`, it pollutes the broker logs
> > with not very useful exceptions and also metrics with false errors [3].
> It
> > would be good to deal with this somehow. I'm see two options:
> >
> > 1. Add a `catch` to handle `InterruptedException` here [4] in a less
> noisy
> > way. Maybe log on the debug level + not increase the error metrics.
> > 2. Catch `InterruptedException` in the `RemoteStorageManager`
> > implementation and return an empty `InputStream`. It seems safe because
> it
> > goes this way [5]. In this case, this needs to be documented in the
> Javadoc.
> >
> > Personally, the first seems a better way. What does the community think
> > about this? Regardless of the decision, I volunteer to make a PR (and a
> KIP
> > if needed).
> > Thank you!
> >
> > Best,
> > Ivan
> >
> > [1]
> >
> https://github.com/apache/kafka/blob/61fb83e20280ed3ac83de8290dd9815bdb7efcea/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1457-L1459
> > [2]
> >
> https://github.com/apache/kafka/blob/6f197301646135e0bb39a461ca0a07c09c3185fb/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L79-L83
> > [3]
> >
> https://github.com/apache/kafka/blob/8aaf7daff393f2b26438fb7fe28016e06e23558c/core/src/main/java/kafka/log/remote/RemoteLogReader.java#L68-L72
> > [4]
> >
> https://github.com/apache/kafka/blob/8aaf7daff393f2b26438fb7fe28016e06e23558c/core/src/main/java/kafka/log/remote/RemoteLogReader.java#L68
> > [5]
> >
> https://github.com/apache/kafka/blob/61fb83e20280ed3ac83de8290dd9815bdb7efcea/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1276-L1278
> >
> >
>


Re: RemoteStorageManager.fetchLogSegment - how to deal with InterruptedException?

2023-11-17 Thread Kamal Chandraprakash
Hi Ivan,

I've opened a relevant patch to increase the timeout to fetch data from
remote storage. This will reduce the error occurrence rate:

https://github.com/apache/kafka/pull/14778

I think it's better to distinguish the cases:

1) Any error that happens while reading the remote data -- this should be
logged at ERROR level and the failedRemoteReadRequestRate metric should
reflect it.
2) When the task is cancelled due to timeout, then we can log it in the
debug level and skip updating the metrics.

On Fri, Nov 17, 2023 at 6:11 PM Ivan Yurchenko  wrote:

> Hello!
>
> `RemoteStorageManager.fetchLogSegment` is called in a background thread by
> the broker [1]. When a fetch request times out, the associated Future is
> cancelled [2] and the thread is interrupted. If the InterruptedException is
> propagated from the `RemoteStorageManager`, it pollutes the broker logs
> with not very useful exceptions and also metrics with false errors [3]. It
> would be good to deal with this somehow. I'm see two options:
>
> 1. Add a `catch` to handle `InterruptedException` here [4] in a less noisy
> way. Maybe log on the debug level + not increase the error metrics.
> 2. Catch `InterruptedException` in the `RemoteStorageManager`
> implementation and return an empty `InputStream`. It seems safe because it
> goes this way [5]. In this case, this needs to be documented in the Javadoc.
>
> Personally, the first seems a better way. What does the community think
> about this? Regardless of the decision, I volunteer to make a PR (and a KIP
> if needed).
> Thank you!
>
> Best,
> Ivan
>
> [1]
> https://github.com/apache/kafka/blob/61fb83e20280ed3ac83de8290dd9815bdb7efcea/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1457-L1459
> [2]
> https://github.com/apache/kafka/blob/6f197301646135e0bb39a461ca0a07c09c3185fb/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L79-L83
> [3]
> https://github.com/apache/kafka/blob/8aaf7daff393f2b26438fb7fe28016e06e23558c/core/src/main/java/kafka/log/remote/RemoteLogReader.java#L68-L72
> [4]
> https://github.com/apache/kafka/blob/8aaf7daff393f2b26438fb7fe28016e06e23558c/core/src/main/java/kafka/log/remote/RemoteLogReader.java#L68
> [5]
> https://github.com/apache/kafka/blob/61fb83e20280ed3ac83de8290dd9815bdb7efcea/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1276-L1278
>
>


RemoteStorageManager.fetchLogSegment - how to deal with InterruptedException?

2023-11-17 Thread Ivan Yurchenko
Hello!

`RemoteStorageManager.fetchLogSegment` is called in a background thread by the 
broker [1]. When a fetch request times out, the associated Future is cancelled 
[2] and the thread is interrupted. If the InterruptedException is propagated 
from the `RemoteStorageManager`, it pollutes the broker logs with not very 
useful exceptions and also metrics with false errors [3]. It would be good to 
deal with this somehow. I'm see two options:

1. Add a `catch` to handle `InterruptedException` here [4] in a less noisy way. 
Maybe log on the debug level + not increase the error metrics.
2. Catch `InterruptedException` in the `RemoteStorageManager` implementation 
and return an empty `InputStream`. It seems safe because it goes this way [5]. 
In this case, this needs to be documented in the Javadoc.

Personally, the first seems a better way. What does the community think about 
this? Regardless of the decision, I volunteer to make a PR (and a KIP if 
needed).
Thank you!

Best,
Ivan

[1] 
https://github.com/apache/kafka/blob/61fb83e20280ed3ac83de8290dd9815bdb7efcea/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1457-L1459
[2] 
https://github.com/apache/kafka/blob/6f197301646135e0bb39a461ca0a07c09c3185fb/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L79-L83
[3] 
https://github.com/apache/kafka/blob/8aaf7daff393f2b26438fb7fe28016e06e23558c/core/src/main/java/kafka/log/remote/RemoteLogReader.java#L68-L72
[4] 
https://github.com/apache/kafka/blob/8aaf7daff393f2b26438fb7fe28016e06e23558c/core/src/main/java/kafka/log/remote/RemoteLogReader.java#L68
[5] 
https://github.com/apache/kafka/blob/61fb83e20280ed3ac83de8290dd9815bdb7efcea/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1276-L1278