Re: Kafka Consumer Retries Failing

2021-07-19 Thread Rahul Patwari
Hi Piotrek,

I was just about to update.
You are right. The issue is because of a stalled task manager due to High
Heap Usage. And the High Heap Usage is because of a Memory Leak in a
library we are using.

Thanks for your help.

On Mon, Jul 19, 2021 at 8:31 PM Piotr Nowojski  wrote:

> Thanks for the update.
>
> > Could the backpressure timeout and heartbeat timeout be because of Heap
> Usage close to Max configured?
>
> Could be. This is one of the things I had in mind under overloaded in:
>
> > might be related to one another via some different deeper problem
> (broken network environment, something being overloaded)
>
> You can easily diagnose it. Just attach a memory profiler or check gc
> logs, just as you would normally do when debugging a non-Flink standalone
> Java application.
>
> It can also be a symptom of a failing network environment. I would first
> check for GC pauses/stops/gaps in the logs that would indicate stalled JVM
> caused those RPC timeouts. If that doesn't bring you closer to a solution I
> would then check for the network environment in your cluster/cloud. Both of
> those might be a reason behind your Kafka issues. Hard to tell. Definitely
> you shouldn't have heartbeat timeouts in your cluster, so something IS
> wrong with your setup.
>
> Best,
> Piotrek
>
> czw., 15 lip 2021 o 17:17 Rahul Patwari 
> napisał(a):
>
>> Thanks for the feedback Piotrek.
>>
>> We have observed the issue again today. As we are using Flink 1.11.1, I
>> tried to check the backpressure of Kafka source tasks from the
>> Jobmanager UI.
>> The backpressure request was canceled due to Timeout and "No Data" was
>> displayed in UI. Here are the respective logs:
>>
>> java.util.concurrent.TimeoutException: Invocation of public abstract
>> java.util.concurrent.CompletableFuture
>> org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.requestTaskBackPressure(org.apache.flink.runtime.executiongraph.ExecutionAttemptID,int,org.apache.flink.api.common.time.Time)
>> timed out.
>> at
>> org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.requestTaskBackPressure(RpcTaskManagerGateway.java:67)
>> .
>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka.tcp://flink@xX.X.X.X:X/user/rpc/taskmanager_0#-1457664622]]
>> after [15000 ms]. Message of type
>> [org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical
>> reason for `AskTimeoutException` is that the recipient actor didn't send a
>> reply.
>> at
>> akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>> .
>>
>> During this time, the heartbeat of one of the Taskmanager to the
>> Jobmanager timed out. Here are the respective logs:
>>
>> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
>> bead57c15b447eac08531693ec91edc4 timed out. at
>> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1193)
>> ..
>>
>> Because of heartbeat timeout, there was an internal restart of Flink and
>> the Kafka consumption rate recovered after the restart.
>>
>> Could the backpressure timeout and heartbeat timeout be because of Heap
>> Usage close to Max configured?
>>
>> On Wed, Jul 14, 2021 at 6:29 PM Piotr Nowojski 
>> wrote:
>>
>>> Hi Rahul,
>>>
>>> I would highly doubt that you are hitting the network bottleneck case.
>>> It would require either a broken environment/network or throughputs in
>>> orders of GB/second. More likely you are seeing empty input pool and you
>>> haven't checked the documentation [1]:
>>>
>>> > inPoolUsage - An estimate of the input buffers usage. (ignores
>>> LocalInputChannels)
>>>
>>> If local channels are backpressured, inPoolUsage will be 0. You can
>>> check downstream task's inputQueueLength or isBackPressured metrics.
>>> Besides that, I would highly recommend upgrading to Flink 1.13.x if you are
>>> investigating backpressure problems as described in the blog post.
>>>
>>> > 1. Can the backpressure Cause "DisconnectException", "Error Sending
>>> Fetch Request to node ..." and other Kafka Consumer logs mentioned above?
>>>
>>> No, I don't think it's possible. Those two might be related to one
>>> another via some different deeper problem (broken network environment,
>>> something being ove

Re: Kafka Consumer Retries Failing

2021-07-15 Thread Rahul Patwari
Thanks for the feedback Piotrek.

We have observed the issue again today. As we are using Flink 1.11.1, I
tried to check the backpressure of Kafka source tasks from the
Jobmanager UI.
The backpressure request was canceled due to Timeout and "No Data" was
displayed in UI. Here are the respective logs:

java.util.concurrent.TimeoutException: Invocation of public abstract
java.util.concurrent.CompletableFuture
org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.requestTaskBackPressure(org.apache.flink.runtime.executiongraph.ExecutionAttemptID,int,org.apache.flink.api.common.time.Time)
timed out.
at
org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.requestTaskBackPressure(RpcTaskManagerGateway.java:67)
.
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://flink@xX.X.X.X:X/user/rpc/taskmanager_0#-1457664622]]
after [15000 ms]. Message of type
[org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical
reason for `AskTimeoutException` is that the recipient actor didn't send a
reply.
at
akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
.

During this time, the heartbeat of one of the Taskmanager to the Jobmanager
timed out. Here are the respective logs:

java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
bead57c15b447eac08531693ec91edc4 timed out. at
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1193)
..

Because of heartbeat timeout, there was an internal restart of Flink and
the Kafka consumption rate recovered after the restart.

Could the backpressure timeout and heartbeat timeout be because of Heap
Usage close to Max configured?

On Wed, Jul 14, 2021 at 6:29 PM Piotr Nowojski  wrote:

> Hi Rahul,
>
> I would highly doubt that you are hitting the network bottleneck case. It
> would require either a broken environment/network or throughputs in orders
> of GB/second. More likely you are seeing empty input pool and you haven't
> checked the documentation [1]:
>
> > inPoolUsage - An estimate of the input buffers usage. (ignores
> LocalInputChannels)
>
> If local channels are backpressured, inPoolUsage will be 0. You can check
> downstream task's inputQueueLength or isBackPressured metrics. Besides
> that, I would highly recommend upgrading to Flink 1.13.x if you are
> investigating backpressure problems as described in the blog post.
>
> > 1. Can the backpressure Cause "DisconnectException", "Error Sending
> Fetch Request to node ..." and other Kafka Consumer logs mentioned above?
>
> No, I don't think it's possible. Those two might be related to one another
> via some different deeper problem (broken network environment, something
> being overloaded), but I don't see a way how one could cause the other.
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/#default-shuffle-service
>
> śr., 14 lip 2021 o 14:18 Rahul Patwari 
> napisał(a):
>
>> Thanks, Piotrek.
>>
>> We have two Kafka sources. We are facing this issue for both of them. The
>> downstream tasks with the sources form two independent directed acyclic
>> graphs, running within the same Streaming Job.
>>
>> For Example:
>> source1 -> task1 -> sink1
>> source2 -> task2 -> sink2
>>
>> There is backpressure in both sources. Verified using the
>> "isBackPressured" metric.
>> For one of the sources, "outPoolUsage" is high whereas "inPoolUsage" of
>> immediate downstream task is ~ 0. I think we are observing the rare case
>> mentioned at the end in [1].
>>
>> I have a couple of questions:
>>
>>1. Can the backpressure Cause "DisconnectException", "Error Sending
>>Fetch Request to node ..." and other Kafka Consumer logs mentioned above?
>>2. What could be the next steps in resolving the backpressure issue -
>>the rare case
>>
>> [1] https://flink.apache.org/2021/07/07/backpressure.html
>>
>> When the stream is running as expected, these are the thread dump of
>> Kafka Source tasks. Comparing the thread dumps - The "Kafka Fetcher"
>> thread, which polls records is blocked by "Legacy Source" Thread(main
>> Thread) - probably because of backpressure.
>>
>>   {
>> "threadName": "Kafka Fetcher for Source: SourceEventSignature (8/12)",
>> "stringifiedThreadInfo": "\"Kafka Fetcher for Source:
>> SourceEventSignature (8/12)\" Id=521 RUNNABLE\n\tat
>> 

Re: java.lang.Exception: Could not complete the stream element: Record @ 1626200691540 :

2021-07-15 Thread Rahul Patwari
Hi Ragini,

AsyncDataStream.unorderedWait() or AsyncDataStream.orderedWait() takes a
timeout as a parameter and the TimeUnit for the timeout specified as
another parameter.

The timeout() method above is called when an Async I/O operation is timed
out. The default operation is to raise an exception when an operation times
out. It seems like you have overridden the timeout method to discard the
record for which the operation timed out.

If records are not getting processed after some time, probably all
operations are getting timed out.

I would suggest to look at the timeout configuration provided in
AsyncDataStream.unorderedWait() or AsyncDataStream.orderedWait() and
identify why there are timeouts rather than overriding the timeout method.

Regards,
Rahul

On Wed, Jul 14, 2021 at 9:19 PM Ragini Manjaiah 
wrote:

> Hi,
> According to the suggestion I  override timeout method in the async
> function . flink jobs processes real time events for few mins and later
> hangs does process at all. Is there any issue with the method below?
> I see 0 records per second . can you please help here
>
> @Override
> public void timeout(Tuple1>> fieldMapTup,
> ResultFuture>, Map, List<
> Map>>> resultFuture) {
> //Timed out. Just discard
> System.out.println("Timeout:" );
>
> On Wed, Jul 14, 2021 at 9:40 AM Rahul Patwari 
> wrote:
>
>> Hi Ragini,
>>
>> From the stack trace, the job failed as the Async I/O Operator has timed
>> out for an event.
>> The timeout is configurable.
>>
>> Please refer
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api
>>
>> Quoting from above documentation:
>>
>>> Timeout: The timeout defines how long an asynchronous request may take
>>> before it is considered failed. This parameter guards against dead/failed
>>> requests.
>>
>>
>> Regards,
>> Rahul
>>
>> On Wed, Jul 14, 2021 at 9:29 AM Ragini Manjaiah <
>> ragini.manja...@gmail.com> wrote:
>>
>>> Hi ,
>>> I am facing the below  issue while processing streaming events. In what
>>> scenarios hit with java.lang.Exception: Could not complete the stream
>>> element. can please help me here . The job fails after this exception is hit
>>>
>>>
>>> 2021-07-13 13:24:58,781 INFO  
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>   [] - ExtractTransformProcess -> AsyncFetch (9/60)
>>> (3809c73da162e9ab3b7b131077aff105) switched from RUNNING to FAILED on
>>> container_e3767_1626191679189_5049_01  _03 @ brdn6162.target.com
>>> (dataPort=41107).
>>>
>>>
>>> java.lang.Exception: Could not complete the stream element: Record @
>>> 1626200691540 : ({})
>>>
>>>
>>> at
>>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.completeExceptionally(AsyncWaitOperator.java:363)
>>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>>
>>> at
>>> org.apache.flink.streaming.api.functions.async.AsyncFunction.timeout(AsyncFunction.java:97)
>>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>>
>>> at
>>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.lambda$processElement$0(AsyncWaitOperator.java:190)
>>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>>
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1223)
>>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>>
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$16(StreamTask.java:1214)
>>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>>
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>>
>>> at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>>
>>> at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:301)
>>> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>>>
>>> at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
>>> ~[appspayload-decoupleHbaseWri

Re: Kafka Consumer Retries Failing

2021-07-14 Thread Rahul Patwari
java:755)\n\tat
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)\n\tat
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)\n\tat
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)\n\n"
}

On Wed, Jul 14, 2021 at 2:39 PM Piotr Nowojski  wrote:

> Hi,
>
> Waiting for memory from LocalBufferPool is a perfectly normal symptom of a
> backpressure [1][2].
>
> Best,
> Piotrek
>
> [1] https://flink.apache.org/2021/07/07/backpressure.html
> [2] https://www.ververica.com/blog/how-flink-handles-backpressure
>
> śr., 14 lip 2021 o 06:05 Rahul Patwari 
> napisał(a):
>
>> Thanks, David, Piotr for your reply.
>>
>> I managed to capture the Thread dump from Jobmanaager UI for few task
>> managers.
>> Here is the thread dump for Kafka Source tasks in one task manager. I
>> could see the same stack trace in other task managers as well. It seems
>> like Kafka Source tasks are waiting on Memory. Any Pointers?
>>
>>   {
>> "threadName": "Kafka Fetcher for Source: SourceEventTransition (6/12)",
>> "stringifiedThreadInfo": "\"Kafka Fetcher for Source:
>> SourceEventTransition (6/12)\" Id=581 WAITING on 
>> java.lang.Object@444c0edc\n\tat
>> java.lang.Object.wait(Native Method)\n\t-  waiting on
>> java.lang.Object@444c0edc\n\tat
>> java.lang.Object.wait(Object.java:502)\n\tat
>> org.apache.flink.streaming.connectors.kafka.internal.Handover.produce(Handover.java:117)\n\tat
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:261)\n\n"
>> }, {
>> "threadName": "Kafka Fetcher for Source: SourceEventSignature (7/12)",
>> "stringifiedThreadInfo": "\"Kafka Fetcher for Source:
>> SourceEventSignature (7/12)\" Id=580 WAITING on 
>> java.lang.Object@7d3843a9\n\tat
>> java.lang.Object.wait(Native Method)\n\t-  waiting on
>> java.lang.Object@7d3843a9\n\tat
>> java.lang.Object.wait(Object.java:502)\n\tat
>> org.apache.flink.streaming.connectors.kafka.internal.Handover.produce(Handover.java:117)\n\tat
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:261)\n\n"
>> }, {
>> "threadName": "Legacy Source Thread - Source: SourceEventSignature
>> (7/12)",
>> "stringifiedThreadInfo": "\"Legacy Source Thread - Source:
>> SourceEventSignature (7/12)\" Id=408 WAITING on
>> java.util.concurrent.CompletableFuture$Signaller@4c613ed7\n\tat
>> sun.misc.Unsafe.park(Native Method)\n\t-  waiting on
>> java.util.concurrent.CompletableFuture$Signaller@4c613ed7\n\tat
>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)\n\tat
>> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)\n\tat
>> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)\n\tat
>> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)\n\tat
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat
>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293)\n\tat
>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)\n\t...\n\n"
>> }, {
>> "threadName": "Legacy Source Thread - Source: SourceEventTransition
>> (6/12)",
>> "stringifiedThreadInfo": "\"Legacy Source Thread - Source:
>> SourceEventTransition (6/12)\" Id=409 WAITING on
>> java.util.concurrent.CompletableFuture$Signaller@5765d0d4\n\tat
>> sun.misc.Unsafe.park(Native Method)\n\t-  waiting on
>> java.util.concurrent.CompletableFuture$Signaller@5765d0d4\n\tat
>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)\n\tat
>> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)\n\tat
>> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)\n\tat
>> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)\n\tat
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat
>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293)\n\tat
>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)\n\t...\n\n"
>> }
>>
>> On Tue, Jul 13, 2021 at 7:07 PM Piotr Nowojski 
>> wrote:
>>
>&g

Re: java.lang.Exception: Could not complete the stream element: Record @ 1626200691540 :

2021-07-13 Thread Rahul Patwari
Hi Ragini,

>From the stack trace, the job failed as the Async I/O Operator has timed
out for an event.
The timeout is configurable.

Please refer
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api

Quoting from above documentation:

> Timeout: The timeout defines how long an asynchronous request may take
> before it is considered failed. This parameter guards against dead/failed
> requests.


Regards,
Rahul

On Wed, Jul 14, 2021 at 9:29 AM Ragini Manjaiah 
wrote:

> Hi ,
> I am facing the below  issue while processing streaming events. In what
> scenarios hit with java.lang.Exception: Could not complete the stream
> element. can please help me here . The job fails after this exception is hit
>
>
> 2021-07-13 13:24:58,781 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph
>   [] - ExtractTransformProcess -> AsyncFetch (9/60)
> (3809c73da162e9ab3b7b131077aff105) switched from RUNNING to FAILED on
> container_e3767_1626191679189_5049_01  _03 @ brdn6162.target.com
> (dataPort=41107).
>
>
> java.lang.Exception: Could not complete the stream element: Record @
> 1626200691540 : ({})
>
>
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.completeExceptionally(AsyncWaitOperator.java:363)
> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>
> at
> org.apache.flink.streaming.api.functions.async.AsyncFunction.timeout(AsyncFunction.java:97)
> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.lambda$processElement$0(AsyncWaitOperator.java:190)
> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1223)
> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$16(StreamTask.java:1214)
> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:301)
> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> ~[appspayload-decoupleHbaseWrite-1.0-SNAPSHOT-all.jar:?]
>
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
>
> Caused by: java.util.concurrent.TimeoutException: Async function call has
> timed out.
>
>
>
> Thanks & regards
>


Re: Kafka Consumer Retries Failing

2021-07-13 Thread Rahul Patwari
Thanks, David, Piotr for your reply.

I managed to capture the Thread dump from Jobmanaager UI for few task
managers.
Here is the thread dump for Kafka Source tasks in one task manager. I could
see the same stack trace in other task managers as well. It seems like
Kafka Source tasks are waiting on Memory. Any Pointers?

  {
"threadName": "Kafka Fetcher for Source: SourceEventTransition (6/12)",
"stringifiedThreadInfo": "\"Kafka Fetcher for Source: SourceEventTransition
(6/12)\" Id=581 WAITING on java.lang.Object@444c0edc\n\tat
java.lang.Object.wait(Native Method)\n\t-  waiting on
java.lang.Object@444c0edc\n\tat
java.lang.Object.wait(Object.java:502)\n\tat
org.apache.flink.streaming.connectors.kafka.internal.Handover.produce(Handover.java:117)\n\tat
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:261)\n\n"
}, {
"threadName": "Kafka Fetcher for Source: SourceEventSignature (7/12)",
"stringifiedThreadInfo": "\"Kafka Fetcher for Source: SourceEventSignature
(7/12)\" Id=580 WAITING on java.lang.Object@7d3843a9\n\tat
java.lang.Object.wait(Native Method)\n\t-  waiting on
java.lang.Object@7d3843a9\n\tat
java.lang.Object.wait(Object.java:502)\n\tat
org.apache.flink.streaming.connectors.kafka.internal.Handover.produce(Handover.java:117)\n\tat
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:261)\n\n"
}, {
"threadName": "Legacy Source Thread - Source: SourceEventSignature (7/12)",
"stringifiedThreadInfo": "\"Legacy Source Thread - Source:
SourceEventSignature (7/12)\" Id=408 WAITING on
java.util.concurrent.CompletableFuture$Signaller@4c613ed7\n\tat
sun.misc.Unsafe.park(Native Method)\n\t-  waiting on
java.util.concurrent.CompletableFuture$Signaller@4c613ed7\n\tat
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)\n\tat
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)\n\tat
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)\n\tat
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)\n\tat
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293)\n\tat
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)\n\t...\n\n"
}, {
"threadName": "Legacy Source Thread - Source: SourceEventTransition (6/12)",
"stringifiedThreadInfo": "\"Legacy Source Thread - Source:
SourceEventTransition (6/12)\" Id=409 WAITING on
java.util.concurrent.CompletableFuture$Signaller@5765d0d4\n\tat
sun.misc.Unsafe.park(Native Method)\n\t-  waiting on
java.util.concurrent.CompletableFuture$Signaller@5765d0d4\n\tat
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)\n\tat
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)\n\tat
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)\n\tat
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)\n\tat
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293)\n\tat
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)\n\t...\n\n"
}

On Tue, Jul 13, 2021 at 7:07 PM Piotr Nowojski  wrote:

> Hi,
>
> I'm not sure, maybe someone will be able to help you, but it sounds like
> it would be better for you to:
> - google search something like "Kafka Error sending fetch request
> TimeoutException" (I see there are quite a lot of results, some of them
> might be related)
> - ask this question on the Kafka mailing list
> - ask this question on stackoverflow as a Kafka question
>
> In short, FlinkKafkaConsumer is a very thin wrapper around the
> KafkaConsumer class, so the thing you are observing has most likely very
> little to do with the Flink itself. In other words, if you are observing
> such a problem you most likely would be possible to reproduce it without
> Flink.
>
> Best,
> Piotrek
>
> pt., 9 lip 2021 o 12:30 Rahul Patwari 
> napisał(a):
>
>> Hi,
>>
>> We have a Flink 1.11.1 Version streaming pipeline in production which
>> reads from Kafka.
>> Kafka Server version is 2.5.0 - confluent 5.5.0
>> Kafka Client Version is 2.4.1 - 
>> {"component":"org.apache.kafka.common.utils.AppInfoParser$AppInfo","message":"Kafka
>> version: 2.4.1","method":""}
>>
>

Kafka Consumer Retries Failing

2021-07-09 Thread Rahul Patwari
Hi,

We have a Flink 1.11.1 Version streaming pipeline in production which reads
from Kafka.
Kafka Server version is 2.5.0 - confluent 5.5.0
Kafka Client Version is 2.4.1 -
{"component":"org.apache.kafka.common.utils.AppInfoParser$AppInfo","message":"Kafka
version: 2.4.1","method":""}

Occasionally(every 6 to 12 hours), we have observed that the Kafka
consumption rate went down(NOT 0) and the following logs were observed:
Generally, the consumption rate across all consumers is 4k records/sec.
When this issue occurred, the consumption rate dropped to < 50 records/sec

org.apache.kafka.common.errors.DisconnectException: null

{"time":"2021-07-07T22:13:37,385","severity":"INFO","component":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Error sending
fetch request (sessionId=405798138, epoch=5808) to node 8:
{}.","method":"handleError"}

org.apache.kafka.common.errors.TimeoutException: Failed

{"time":"2021-07-07T22:26:41,379","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"[Consumer
clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Group coordinator
100.98.40.16:9092 (id: 2147483623 rack: null) is unavailable or invalid,
will attempt rediscovery","method":"markCoordinatorUnknown"}

{"time":"2021-07-07T22:27:10,465","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler","message":"[Consumer
clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Discovered group
coordinator 100.98.40.16:9092 (id: 2147483623 rack:
null)","method":"onSuccess"}

The consumers retried for more than an hour but the above logs are observed
again.
The consumers started pulling data after a manual restart.

No WARN or ERROR logs were observed in Kafka or Zookeeper during this
period.

Our observation from this incident is that Kafka Consumer retries could not
resolve the issue but a manual restart (or) Flink internal restart(Failure
rate restart policy) does.

Has anyone faced this issue before? Any pointers are appreciated.

Regards,
Rahul


Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

2021-04-13 Thread Rahul Patwari
Hi Arvid,

Thanks for your inputs. They are super helpful.

Why do you need the window operator at all? Couldn't you just backpressure
> on the async I/O by delaying the processing there?
>

I haven't explored this approach. Wouldn't the backpressure gets propagated
upstream and the consumption rate from Kafka gets affected by it?

What's keeping you from attaching the offset of the Kafka records to A, B,
> C and write the offset when writing the record into the sink? (Probably
> need to wrap your sink function into a delegating sink function)
>

I am new to delegating sink. Can you please help me to understand how this
approach helps in failures or recovery to replay records from Kafka?
Currently, I am using auto offset commit of Kafka Consumer, which I think
commits offsets every 5 seconds, by default.

Checkpointing will definitely solve the problem. But, replaying records
from Kafka seems like a simpler approach to guarantee "at least once"
processing throughout the life of the stateless pipeline. Replaying records
from Kafka also seems like a simpler approach operationally.

Is there no other way to guarantee "at least once" processing without
checkpointing?

Checkpointing seems like is the only approach to guarantee "at least
once"/"exactly once" processing for stateful pipelines. But support for
replaying records to guarantee "at least once" processing would be helpful.

I know that Checkpointing has "at least once" mode. Probably we can add one
more mode where records are replayed from Source and State is not
checkpointed. Just a Suggestion. What are your thoughts? In this case, this
approach will be very helpful where only Kafka offsets are checkpointed.

Thanks,
Rahul


On Tue, Apr 13, 2021 at 7:20 PM Arvid Heise  wrote:

> Hi Rahul,
>
> This pipeline should process millions of records per day with low latency.
>> I am avoiding Checkpointing, as the records in the Window operator and
>> in-flight records in the Async I/O operator are persisted along with the
>> Kafka offsets. But the records in Window and Async I/O operators can be
>> obtained just by replaying the records from Kafka Source, which is the
>> approach I want to take.
>> There is Deduplication logic in the pipeline. So, I am preferring to
>> replay records in case of failures rather than storing the incremental
>> snapshots.
>>
> Did you measure how much data is being checkpointed? A few million records
> per day should just be a few megabytes per checkpoint.
> It sounds to me as if you would rather want your approach because it's
> conceptually more sound but not because it is necessary.
>
> I'm asking because delaying records and using async I/O is increasing your
> latency significantly anyways. So a couple of additional ms to the
> checkpoint don't sound like it will invalidate your use case to me. It will
> also be cheaper if you factor in your work time and will also work if you
> ever extend your pipeline to hold state.
> Recovery should also not be worse because you restore the records from
> blob storage instead of fetching it from the source system.
>
> Also let me repeat these questions
>
>> 1. Why do you need the window operator at all? Couldn't you just
>> backpressure on the async I/O by delaying the processing there?
>>
> 2. What's keeping you from attaching the offset of the Kafka records to A,
>> B, C and write the offset when writing the record into the sink? (Probably
>> need to wrap your sink function into a delegating sink function)
>>
>
>
>
>
> On Tue, Apr 13, 2021 at 12:33 PM Rahul Patwari 
> wrote:
>
>> Hi Arvid,
>>
>> Thanks for the reply.
>>
>> could you please help me to understand how the at least once guarantee
>>> would work without checkpointing in your case?
>>>
>>
>> This was the plan to maintain "at least once" guarantee:
>> Logic at Sink:
>> The DataStream on which Sink Function is applied, on the same DataStream,
>> apply a widowing operator and compute min of the timestamps of records and
>> persist the timestamp in Cassandra -> This will persist the record
>> timestamp below which all the records are processed, say, every 10 seconds.
>> (The delay created by the Windowing Operator used here makes sure that the
>> timestamp is persisted in Cassandra only after it is written to Sink)
>> Note: There is another Windowing Operator at the Source to delay the
>> processing of records.
>>
>> Logic at Source:
>> While creating the JobGraph, read the timestamp persisted in Cassandra
>> for each topic and configure the start position of Kafka Consumer.
>>
>> T

Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

2021-04-13 Thread Rahul Patwari
Hi Arvid,

Thanks for the reply.

could you please help me to understand how the at least once guarantee
> would work without checkpointing in your case?
>

This was the plan to maintain "at least once" guarantee:
Logic at Sink:
The DataStream on which Sink Function is applied, on the same DataStream,
apply a widowing operator and compute min of the timestamps of records and
persist the timestamp in Cassandra -> This will persist the record
timestamp below which all the records are processed, say, every 10 seconds.
(The delay created by the Windowing Operator used here makes sure that the
timestamp is persisted in Cassandra only after it is written to Sink)
Note: There is another Windowing Operator at the Source to delay the
processing of records.

Logic at Source:
While creating the JobGraph, read the timestamp persisted in Cassandra for
each topic and configure the start position of Kafka Consumer.

The problem is that the start positions are not respected when there are
Automatic restarts during failures. Basically, we wanted to read the
timestamp from Cassandra and start consuming from the timestamp even in
case of Automatic restarts during failures.

This pipeline should process millions of records per day with low latency.
I am avoiding Checkpointing, as the records in the Window operator and
in-flight records in the Async I/O operator are persisted along with the
Kafka offsets. But the records in Window and Async I/O operators can be
obtained just by replaying the records from Kafka Source, which is the
approach I want to take.
There is Deduplication logic in the pipeline. So, I am preferring to replay
records in case of failures rather than storing the incremental snapshots.

Thanks,
Rahul

On Tue, Apr 13, 2021 at 2:53 PM Arvid Heise  wrote:

> Hi Rahul,
>
> could you please help me to understand how the at least once guarantee
> would work without checkpointing in your case?
>
> Let's say you read records A, B, C. You use a window to delay processing,
> so let's say A passes and B, C are still in the window for the trigger.
>
> Now do you want to auto commit the offset of A after it being written in
> the sink? If so, what's keeping you from attaching the offset of the Kafka
> records to A, B, C and write the offset when writing the record into the
> sink? (Probably need to wrap your sink function into a delegating sink
> function)
>
> The way, Flink does the checkpointing is that it checkpoints the offset of
> C, and the state of the window (containing B, C) to avoid data loss. Why is
> that not working for you? Which state size do you expect?
>
> Why do you need the window operator at all? Couldn't you just backpressure
> on the async I/O by delaying the processing there? Then there would be no
> need to change anything.
>
> On Tue, Apr 13, 2021 at 10:00 AM Roman Khachatryan 
> wrote:
>
>> Hi Rahul,
>>
>> Right. There are no workarounds as far as I know.
>>
>> Regards,
>> Roman
>>
>> On Mon, Apr 12, 2021 at 9:00 PM Rahul Patwari
>>  wrote:
>> >
>> > Hi Roman, Arvid,
>> >
>> > So, to achieve "at least once" guarantee, currently, automatic restart
>> of Flink should be disabled?
>> > Is there any workaround to get "at least once" semantics with Flink
>> Automatic restarts in this case?
>> >
>> > Regards,
>> > Rahul
>> >
>> > On Mon, Apr 12, 2021 at 9:57 PM Roman Khachatryan 
>> wrote:
>> >>
>> >> Hi,
>> >>
>> >> Thanks for the clarification.
>> >>
>> >> > Other than managing offsets externally, Are there any other ways to
>> guarantee "at least once" processing without enabling checkpointing?
>> >>
>> >> That's currently not possible, at least with the default connector.
>> >>
>> >> Regards,
>> >> Roman
>> >>
>> >> On Mon, Apr 12, 2021 at 3:14 PM Rahul Patwari
>> >>  wrote:
>> >> >
>> >> > Hi Roman,
>> >> >
>> >> > Thanks for the reply.
>> >> > This is what I meant by Internal restarts - Automatic restore of
>> Flink Job from a failure. For example, pipeline restarts when Fixed delay
>> or Failure Rate restart strategies are configured.
>> >> >
>> >> > Quoting documentation in this link - Configuring Kafka Consumer
>> start position configuration
>> >> >
>> >> >> Note that these start position configuration methods do not affect
>> the start position when the job is automatically restored from a failure
>> >> >
>> >> >
>&

Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

2021-04-12 Thread Rahul Patwari
Hi Roman, Arvid,

So, to achieve "at least once" guarantee, currently, automatic restart of
Flink should be disabled?
Is there any workaround to get "at least once" semantics with Flink
Automatic restarts in this case?

Regards,
Rahul

On Mon, Apr 12, 2021 at 9:57 PM Roman Khachatryan  wrote:

> Hi,
>
> Thanks for the clarification.
>
> > Other than managing offsets externally, Are there any other ways to
> guarantee "at least once" processing without enabling checkpointing?
>
> That's currently not possible, at least with the default connector.
>
> Regards,
> Roman
>
> On Mon, Apr 12, 2021 at 3:14 PM Rahul Patwari
>  wrote:
> >
> > Hi Roman,
> >
> > Thanks for the reply.
> > This is what I meant by Internal restarts - Automatic restore of Flink
> Job from a failure. For example, pipeline restarts when Fixed delay or
> Failure Rate restart strategies are configured.
> >
> > Quoting documentation in this link - Configuring Kafka Consumer start
> position configuration
> >
> >> Note that these start position configuration methods do not affect the
> start position when the job is automatically restored from a failure
> >
> >
> >
> > It seems that there will be data loss even when offsets are managed
> externally when there are pipeline restarts due to a failure, say, an
> exception. On the other hand, when the pipeline is stopped and
> resubmitted(say, an upgrade), there won't be any data loss as offsets are
> retrieved from an external store and configured while starting Kafka
> Consumer.
> >
> > We do not want to enable checkpointing as the pipeline is stateless. We
> have Deduplication logic in the pipeline and the processing is idempotent.
> >
> > Other than managing offsets externally, Are there any other ways to
> guarantee "at least once" processing without enabling checkpointing?
> >
> > Thanks,
> > Rahul
> >
> > On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan 
> wrote:
> >>
> >> Hi,
> >>
> >> Could you please explain what you mean by internal restarts?
> >>
> >> If you commit offsets or timestamps from sink after emitting records
> >> to the external system then there should be no data loss.
> >> Otherwise (if you commit offsets earlier), you have to persist
> >> in-flight records to avoid data loss (i.e. enable checkpointing).
> >>
> >> Regards,
> >> Roman
> >>
> >> On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari
> >>  wrote:
> >> >
> >> > Hello,
> >> >
> >> > Context:
> >> >
> >> > We have a stateless Flink Pipeline which reads from Kafka topics.
> >> > The pipeline has a Windowing operator(Used only for introducing a
> delay in processing records) and AsyncI/O operators (used for
> Lookup/Enrichment).
> >> >
> >> > "At least Once" Processing semantics is needed for the pipeline to
> avoid data loss.
> >> >
> >> > Checkpointing is disabled and we are dependent on the auto offset
> commit of Kafka consumer for fault tolerance currently.
> >> >
> >> > As auto offset commit indicates that "the record is successfully
> read", instead of "the record is successfully processed", there will be
> data loss if there is a restart when the offset is committed to Kafka but
> not successfully processed by the Flink Pipeline, as the record is NOT
> replayed again when the pipeline is restarted.
> >> >
> >> > Checkpointing can solve this problem. But, since the pipeline is
> stateless, we do not want to use checkpointing, which will persist all the
> records in Windowing Operator and in-flight Async I/O calls.
> >> >
> >> > Question:
> >> >
> >> > We are looking for other ways to guarantee "at least once" processing
> without checkpointing. One such way is to manage Kafka Offsets Externally.
> >> >
> >> > We can maintain offsets of each partition of each topic in
> Cassandra(or maintain timestamp, where all records with timestamps less
> than this timestamp are successfully processed) and configure Kafka
> consumer Start Position - setStartFromTimestamp() or
> setStartFromSpecificOffsets()
> >> >
> >> > This will be helpful if the pipeline is manually restarted (say,
> JobManager pod is restarted). But, how to avoid data loss in case of
> internal restarts?
> >> >
> >> > Has anyone used this approach?
> >> > What are other ways to guarantee "at least once" processing without
> checkpointing for a stateless Flink pipeline?
> >> >
> >> > Thanks,
> >> > Rahul
>


Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

2021-04-12 Thread Rahul Patwari
Hi Roman,

Thanks for the reply.
This is what I meant by Internal restarts - Automatic restore of Flink Job
from a failure. For example, pipeline restarts when Fixed delay
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html#fixed-delay-restart-strategy>
or Failure Rate
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html#failure-rate-restart-strategy>
restart strategies are configured.

Quoting documentation in this link - Configuring Kafka Consumer start
position configuration
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration>


Note that these start position configuration methods do not affect the
> start position when the job is automatically restored from a failure



It seems that there will be data loss even when offsets are managed
externally when there are pipeline restarts due to a failure, say, an
exception. On the other hand, when the pipeline is stopped and
resubmitted(say, an upgrade), there won't be any data loss as offsets are
retrieved from an external store and configured while starting Kafka
Consumer.

We do not want to enable checkpointing as the pipeline is stateless. We
have Deduplication logic in the pipeline and the processing is idempotent.

Other than managing offsets externally, Are there any other ways to
guarantee "at least once" processing without enabling checkpointing?

Thanks,
Rahul

On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan  wrote:

> Hi,
>
> Could you please explain what you mean by internal restarts?
>
> If you commit offsets or timestamps from sink after emitting records
> to the external system then there should be no data loss.
> Otherwise (if you commit offsets earlier), you have to persist
> in-flight records to avoid data loss (i.e. enable checkpointing).
>
> Regards,
> Roman
>
> On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari
>  wrote:
> >
> > Hello,
> >
> > Context:
> >
> > We have a stateless Flink Pipeline which reads from Kafka topics.
> > The pipeline has a Windowing operator(Used only for introducing a delay
> in processing records) and AsyncI/O operators (used for Lookup/Enrichment).
> >
> > "At least Once" Processing semantics is needed for the pipeline to avoid
> data loss.
> >
> > Checkpointing is disabled and we are dependent on the auto offset commit
> of Kafka consumer for fault tolerance currently.
> >
> > As auto offset commit indicates that "the record is successfully read",
> instead of "the record is successfully processed", there will be data loss
> if there is a restart when the offset is committed to Kafka but not
> successfully processed by the Flink Pipeline, as the record is NOT replayed
> again when the pipeline is restarted.
> >
> > Checkpointing can solve this problem. But, since the pipeline is
> stateless, we do not want to use checkpointing, which will persist all the
> records in Windowing Operator and in-flight Async I/O calls.
> >
> > Question:
> >
> > We are looking for other ways to guarantee "at least once" processing
> without checkpointing. One such way is to manage Kafka Offsets Externally.
> >
> > We can maintain offsets of each partition of each topic in Cassandra(or
> maintain timestamp, where all records with timestamps less than this
> timestamp are successfully processed) and configure Kafka consumer Start
> Position - setStartFromTimestamp() or setStartFromSpecificOffsets()
> >
> > This will be helpful if the pipeline is manually restarted (say,
> JobManager pod is restarted). But, how to avoid data loss in case of
> internal restarts?
> >
> > Has anyone used this approach?
> > What are other ways to guarantee "at least once" processing without
> checkpointing for a stateless Flink pipeline?
> >
> > Thanks,
> > Rahul
>


Manage Kafka Offsets for Fault Tolerance Without Checkpointing

2021-04-12 Thread Rahul Patwari
Hello,

*Context*:

We have a stateless Flink Pipeline which reads from Kafka topics.
The pipeline has a Windowing operator(Used only for introducing a delay in
processing records) and AsyncI/O operators (used for Lookup/Enrichment).

"At least Once" Processing semantics is needed for the pipeline to avoid
data loss.

Checkpointing is disabled and we are dependent on the auto offset commit of
Kafka consumer for fault tolerance currently.

As auto offset commit indicates that "the record is successfully read",
instead of "the record is successfully processed", there will be data loss
if there is a restart when the offset is committed to Kafka but not
successfully processed by the Flink Pipeline, as the record is NOT replayed
again when the pipeline is restarted.

Checkpointing can solve this problem. But, since the pipeline is stateless,
we do not want to use checkpointing, which will persist all the records in
Windowing Operator and in-flight Async I/O calls.

*Question*:

We are looking for other ways to guarantee "at least once" processing
without checkpointing. One such way is to manage Kafka Offsets Externally.

We can maintain offsets of each partition of each topic in Cassandra(or
maintain timestamp, where all records with timestamps less than this
timestamp are successfully processed) and configure Kafka consumer Start
Position

- setStartFromTimestamp() or setStartFromSpecificOffsets()

This will be helpful if the pipeline is manually restarted (say, JobManager
pod is restarted). *But, how to avoid data loss in case of internal
restarts?*

Has anyone used this approach?
What are other ways to guarantee "at least once" processing without
checkpointing for a stateless Flink pipeline?

Thanks,
Rahul


Re: sporadic "Insufficient no of network buffers" issue

2020-08-02 Thread Rahul Patwari
After debugging more, it seems like this issue is caused by the scheduling
strategy.
Depending on the tasks assigned to the task manager, probably the amount of
memory configured for network buffers is running out.

Through these references: FLINK-12122
<https://issues.apache.org/jira/browse/FLINK-12122>, FLINK-15031
<https://issues.apache.org/jira/browse/FLINK-15031>, Flink 1.10 release
notes
<https://ci.apache.org/projects/flink/flink-docs-stable/release-notes/flink-1.10.html>
we
came to know that the scheduling strategy has changed since 1.5.0(FLIP-6)
from 1.4.2 and the change is sort of fixed back in 1.9.2 with support for
providing a configuration for scheduling strategy -
cluster.evenly-spread-out-slots:
true
<https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#cluster-evenly-spread-out-slots>

"Spread out" strategy could definitely help in this case.
can you please confirm our findings and probably suggest some possible ways
to mitigate this issue.

Rahul

On Sat, Aug 1, 2020 at 9:24 PM Rahul Patwari 
wrote:

> From the metrics in Prometheus, we observed that the minimum
> AvailableMemorySegments out of all the task managers is 4.5k when the
> exception was thrown.
> So there were enough network buffers.
> correction to the configs provided above: each TM CPU has 8 cores.
>
> Apart from having fewer network buffers, can something else trigger this
> issue?
> Also, is it expected that the issue is sporadic?
>
> Rahul
>
> On Sat, Aug 1, 2020 at 12:24 PM Ivan Yang  wrote:
>
>> Yes, increase the taskmanager.network.memory.fraction in your case. Also
>> reduce the parallelism will reduce number of network buffer required for
>> your job. I never used 1.4.x, so don’t know about it.
>>
>> Ivan
>>
>> On Jul 31, 2020, at 11:37 PM, Rahul Patwari 
>> wrote:
>>
>> Thanks for your reply, Ivan.
>>
>> I think taskmanager.network.memory.max is by default 1GB.
>> In my case, the network buffers memory is 13112 * 32768 = around 400MB
>> which is 10% of the TM memory as by default
>> taskmanager.network.memory.fraction is 0.1.
>> Do you mean to increase taskmanager.network.memory.fraction?
>>
>>1. If Flink is upgraded from 1.4.2 to 1.8.2 does the application
>>need more network buffers?
>>2. Can this issue happen sporadically? sometimes this issue is not
>>seen when the job manager is restarted.
>>
>> I am thinking whether having fewer network buffers is the root cause (or)
>> if the root cause is something else which triggers this issue.
>>
>> On Sat, Aug 1, 2020 at 9:36 AM Ivan Yang  wrote:
>>
>>> Hi Rahul,
>>>
>>> Try to increase taskmanager.network.memory.max to 1GB, basically double
>>> what you have now. However, you only have 4GB RAM for the entire TM, seems
>>> out of proportion to have 1GB network buffer with 4GB total RAM. Reducing
>>> number of shuffling will require less network buffer. But if your job need
>>> the shuffling, then you may consider to add more memory to TM.
>>>
>>> Thanks,
>>> Ivan
>>>
>>> On Jul 31, 2020, at 2:02 PM, Rahul Patwari 
>>> wrote:
>>>
>>> Hi,
>>>
>>> We are observing "Insufficient number of Network Buffers" issue
>>> Sporadically when Flink is upgraded from 1.4.2 to 1.8.2.
>>> The state of the tasks with this issue translated from DEPLOYING to
>>> FAILED.
>>> Whenever this issue occurs, the job manager restarts. Sometimes, the
>>> issue goes away after the restart.
>>> As we are not getting the issue consistently, we are in a dilemma of
>>> whether to change the memory configurations or not.
>>>
>>> Min recommended No. of Network Buffers: (8 * 8) * 8 * 4 = 2048
>>> The exception says that 13112 no. of network buffers are present, which
>>> is 6x the recommendation.
>>>
>>> Is reducing the no. of shuffles the only way to reduce the no. of
>>> network buffers required?
>>>
>>> Thanks,
>>> Rahul
>>>
>>> configs:
>>> env: Kubernetes
>>> Flink: 1.8.2
>>> using default configs for memory.fraction, memory.min, memory.max.
>>> using 8 TM, 8 slots/TM
>>> Each TM is running with 1 core, 4 GB Memory.
>>>
>>> Exception:
>>> java.io.IOException: Insufficient number of network buffers: required 2,
>>> but only 0 available. The total number of network buffers is currently set
>>> to 13112 of 32768 bytes each. You can increase this number by setting the
>>> configuration keys 'taskmanager.netwo

Re: sporadic "Insufficient no of network buffers" issue

2020-08-01 Thread Rahul Patwari
>From the metrics in Prometheus, we observed that the minimum
AvailableMemorySegments out of all the task managers is 4.5k when the
exception was thrown.
So there were enough network buffers.
correction to the configs provided above: each TM CPU has 8 cores.

Apart from having fewer network buffers, can something else trigger this
issue?
Also, is it expected that the issue is sporadic?

Rahul

On Sat, Aug 1, 2020 at 12:24 PM Ivan Yang  wrote:

> Yes, increase the taskmanager.network.memory.fraction in your case. Also
> reduce the parallelism will reduce number of network buffer required for
> your job. I never used 1.4.x, so don’t know about it.
>
> Ivan
>
> On Jul 31, 2020, at 11:37 PM, Rahul Patwari 
> wrote:
>
> Thanks for your reply, Ivan.
>
> I think taskmanager.network.memory.max is by default 1GB.
> In my case, the network buffers memory is 13112 * 32768 = around 400MB
> which is 10% of the TM memory as by default
> taskmanager.network.memory.fraction is 0.1.
> Do you mean to increase taskmanager.network.memory.fraction?
>
>1. If Flink is upgraded from 1.4.2 to 1.8.2 does the application
>need more network buffers?
>2. Can this issue happen sporadically? sometimes this issue is not
>seen when the job manager is restarted.
>
> I am thinking whether having fewer network buffers is the root cause (or)
> if the root cause is something else which triggers this issue.
>
> On Sat, Aug 1, 2020 at 9:36 AM Ivan Yang  wrote:
>
>> Hi Rahul,
>>
>> Try to increase taskmanager.network.memory.max to 1GB, basically double
>> what you have now. However, you only have 4GB RAM for the entire TM, seems
>> out of proportion to have 1GB network buffer with 4GB total RAM. Reducing
>> number of shuffling will require less network buffer. But if your job need
>> the shuffling, then you may consider to add more memory to TM.
>>
>> Thanks,
>> Ivan
>>
>> On Jul 31, 2020, at 2:02 PM, Rahul Patwari 
>> wrote:
>>
>> Hi,
>>
>> We are observing "Insufficient number of Network Buffers" issue
>> Sporadically when Flink is upgraded from 1.4.2 to 1.8.2.
>> The state of the tasks with this issue translated from DEPLOYING to
>> FAILED.
>> Whenever this issue occurs, the job manager restarts. Sometimes, the
>> issue goes away after the restart.
>> As we are not getting the issue consistently, we are in a dilemma of
>> whether to change the memory configurations or not.
>>
>> Min recommended No. of Network Buffers: (8 * 8) * 8 * 4 = 2048
>> The exception says that 13112 no. of network buffers are present, which
>> is 6x the recommendation.
>>
>> Is reducing the no. of shuffles the only way to reduce the no. of network
>> buffers required?
>>
>> Thanks,
>> Rahul
>>
>> configs:
>> env: Kubernetes
>> Flink: 1.8.2
>> using default configs for memory.fraction, memory.min, memory.max.
>> using 8 TM, 8 slots/TM
>> Each TM is running with 1 core, 4 GB Memory.
>>
>> Exception:
>> java.io.IOException: Insufficient number of network buffers: required 2,
>> but only 0 available. The total number of network buffers is currently set
>> to 13112 of 32768 bytes each. You can increase this number by setting the
>> configuration keys 'taskmanager.network.memory.fraction',
>> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
>> at
>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:138)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:311)
>> at
>> org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:271)
>> at
>> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>>
>


Re: sporadic "Insufficient no of network buffers" issue

2020-07-31 Thread Rahul Patwari
Thanks for your reply, Ivan.

I think taskmanager.network.memory.max is by default 1GB.
In my case, the network buffers memory is 13112 * 32768 = around 400MB
which is 10% of the TM memory as by default
taskmanager.network.memory.fraction is 0.1.
Do you mean to increase taskmanager.network.memory.fraction?

   1. If Flink is upgraded from 1.4.2 to 1.8.2 does the application
   need more network buffers?
   2. Can this issue happen sporadically? sometimes this issue is not seen
   when the job manager is restarted.

I am thinking whether having fewer network buffers is the root cause (or)
if the root cause is something else which triggers this issue.

On Sat, Aug 1, 2020 at 9:36 AM Ivan Yang  wrote:

> Hi Rahul,
>
> Try to increase taskmanager.network.memory.max to 1GB, basically double
> what you have now. However, you only have 4GB RAM for the entire TM, seems
> out of proportion to have 1GB network buffer with 4GB total RAM. Reducing
> number of shuffling will require less network buffer. But if your job need
> the shuffling, then you may consider to add more memory to TM.
>
> Thanks,
> Ivan
>
> On Jul 31, 2020, at 2:02 PM, Rahul Patwari 
> wrote:
>
> Hi,
>
> We are observing "Insufficient number of Network Buffers" issue
> Sporadically when Flink is upgraded from 1.4.2 to 1.8.2.
> The state of the tasks with this issue translated from DEPLOYING to
> FAILED.
> Whenever this issue occurs, the job manager restarts. Sometimes, the issue
> goes away after the restart.
> As we are not getting the issue consistently, we are in a dilemma of
> whether to change the memory configurations or not.
>
> Min recommended No. of Network Buffers: (8 * 8) * 8 * 4 = 2048
> The exception says that 13112 no. of network buffers are present, which is
> 6x the recommendation.
>
> Is reducing the no. of shuffles the only way to reduce the no. of network
> buffers required?
>
> Thanks,
> Rahul
>
> configs:
> env: Kubernetes
> Flink: 1.8.2
> using default configs for memory.fraction, memory.min, memory.max.
> using 8 TM, 8 slots/TM
> Each TM is running with 1 core, 4 GB Memory.
>
> Exception:
> java.io.IOException: Insufficient number of network buffers: required 2,
> but only 0 available. The total number of network buffers is currently set
> to 13112 of 32768 bytes each. You can increase this number by setting the
> configuration keys 'taskmanager.network.memory.fraction',
> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:138)
> at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:311)
> at
> org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:271)
> at
> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
> at java.lang.Thread.run(Thread.java:748)
>
>
>


sporadic "Insufficient no of network buffers" issue

2020-07-31 Thread Rahul Patwari
Hi,

We are observing "Insufficient number of Network Buffers" issue
Sporadically when Flink is upgraded from 1.4.2 to 1.8.2.
The state of the tasks with this issue translated from DEPLOYING to FAILED.
Whenever this issue occurs, the job manager restarts. Sometimes, the issue
goes away after the restart.
As we are not getting the issue consistently, we are in a dilemma of
whether to change the memory configurations or not.

Min recommended No. of Network Buffers: (8 * 8) * 8 * 4 = 2048
The exception says that 13112 no. of network buffers are present, which is
6x the recommendation.

Is reducing the no. of shuffles the only way to reduce the no. of network
buffers required?

Thanks,
Rahul

configs:
env: Kubernetes
Flink: 1.8.2
using default configs for memory.fraction, memory.min, memory.max.
using 8 TM, 8 slots/TM
Each TM is running with 1 core, 4 GB Memory.

Exception:
java.io.IOException: Insufficient number of network buffers: required 2,
but only 0 available. The total number of network buffers is currently set
to 13112 of 32768 bytes each. You can increase this number by setting the
configuration keys 'taskmanager.network.memory.fraction',
'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
at
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:138)
at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:311)
at
org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:271)
at
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
at java.lang.Thread.run(Thread.java:748)