Hi Isidoros,

I’m not sure in which kind of way the timeout and the high back pressure are 
related, but I think we can try to resolve the request timeout issue first. You 
can take a look at the request log on Kafka broker and see if the request was 
received by broker, and how long it takes for broker to handle it. By default 
the request log is on WARN level, and you may want to increase it to DEBUG or 
TRACE to reveal more information. 

Another thought in my mind is about the content of the record, since you 
mentioned extremely high back pressure after the disconnection issue. If some 
messages are quite large or complex, it might block the network or require more 
resources to make the serde, even burden some operator in the pipeline and 
finally lead to back pressure. Once the back pressure happens in the pipeline, 
you can try to locate the operator causing the back pressure and make some 
analysis to see why the throughput drops, or dump the record to see if there’s 
something special in it. 

Hope these could be helpful! 

Best regards, 

Qingsheng

> On Mar 23, 2022, at 19:19, Isidoros Ioannou <akis3...@gmail.com> wrote:
> 
> Hi, we are running flink 1.13.2 version on Kinesis Analytics. Our source is a 
> kafka topic with one partition so far and we are using the FlinkKafkaConsumer 
> (kafka-connector-1.13.2) 
> Sometimes we get some errors from the consumer like the below:
> 
> "locationInformation":"org.apache.kafka.clients.FetchSessionHandler.handleError(FetchSessionHandler.java:445)",
> "logger": "org.apache.kafka.clients.FetchSessionHandler",
> "message": "[Consumer 
> clientId=consumer-realtime-analytics-eu-production-node2-2, 
> groupId=realtime-analytics-eu-production-node2] Error sending fetch request 
> (sessionId=1343463307, epoch=172059) to node 3: 
> org.apache.kafka.common.errors.DisconnectException.",
>     "threadName": "Kafka Fetcher for Source: Kafka -> Map -> Filter -> Map -> 
> Filter -> Timestamps/Watermarks -> Filter (1/1)#0",
> 
> With the debug logging it appeared that this happens due to request timeout 
> so I have increased the request.timeout.ms to 60000 , however it did not 
> resolve the issue. Even if I get the disconnection I can see that after 1s 
> the consumer sends a successful fetchRequest.
> 
> The problem we have noticed is that after the disconnection the application 
> stays behind from processing. the backpressure on the source gets 100% and 
> the app consumes events at a lower rate even if we do not have much traffic 
> to cope with. 
> 
> We use eventTime and the watermarks are not generated in the consumer since 
> we have one partition. the source is the following
> 
> DataStream<ServerAwareJournal> stream =
>                 
> env.addSource(consumerBase).name("Kafka").uid("Kafka").filter(f -> 
> !f.getServerId().equals("Demo150")).keyBy(ServerAwareJournal::getServerId);
> 
> and then we assign the following watermark: 
> 
> WatermarkStrategy.<ServerAwareJournal 
> >forBoundedOutOfOrderness(Duration.ofSeconds(3))
>             .withTimestampAssigner((element, recordTimestamp) -> 
> element.getMessage().getDateTime().atZone(journalTimezone).toInstant()
>                     .toEpochMilli()).withIdleness(Duration.ofMinutes(1));
> 
> the upstream operators are 10 cep operators with a parallelism of 15 and then 
> there is a union of the data emitted from the CEP operators and added to 
> firehose sink.
> Another thing is that we ran two parallel instances of the same application 
> i.e two kinesis analytics nodes (one for debug purposes), the debug node has 
> checkpointing disabled.
> 
> Could you please give me some advice on where to look to find a solution to 
> this issue?
> Thanks in advance
> 
> 

Reply via email to