Re: Throttling/effective back-pressure on a Kafka sink

2019-05-30 Thread Derek VerLee

  
  
Was any progress ever made on this?  We have seen the same issue
  in the past.  What I do remember is, whatever I set max.block.ms
  to, is when the job crashes.
  I am going to attempt to reproduce the issue again and will report
  back.



On 3/28/19 3:27 PM, Konstantin Knauf
  wrote:


  
  
Hi Marc, 



the Kafka Producer should be able to create backpressure.
  Could you try to increase max.block.ms to Long.MAX_VALUE?



The exceptions you shared for the failure case don't look
  like the root causes of the problem. Could you share the full
  stacktraces or even full logs for this time frame. Feel free
  to send these logs to me directly, if you don't want to share
  them on the list.


Best, 



Konstantin








  On Thu, Mar 28, 2019 at 2:04
PM Marc Rooding 
wrote:
  
  

  Hi


We’ve got a job producing to a Kafka sink. The
  Kafka topics have a retention of 2 weeks. When doing a
  complete replay, it seems like Flink isn’t able to
  back-pressure or throttle the amount of messages going
  to Kafka, causing the following error:


org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
  Failed to send data to Kafka: Expiring 8396 record(s)
  for topic-1:12 ms has passed since batch creation


We’re running on Flink 1.7.2 with
  flink-connector-kafka:1.7.2. Our Kafka cluster is
  running version 2.1.1. The Kafka producer uses all
  default settings except from:


compression.type = snappy
max.in.flight.requests.per.connection = 1
acks = all
client.dns.lookup = use_all_dns_ips
  
  
I tried playing around with the buffer and batch
settings, increasing timeouts, but none seem to be what
we need. Increasing the delivery.timeout.ms and request.timeout.ms solves
the initial error, but causes the Flink job to fail
entirely due to:


Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
  Could not forward element to next operator
Caused by: java.lang.RuntimeException: Buffer pool
  is destroyed.


My assumption is that the Kafka producer will start
  blocking since it notices that it can't handle the
  batches, and Flink eventually runs out of buffers for
  the operator.


What really baffles me is that the backpressure tab
  shows that everything is OK. The entire job pipeline
  (which reads from 4 different topics, unions them all
  and sinks towards 1 topic) pushes all the messages
  through to the sink stage, resulting in 18 million
  incoming stage messages, even though Kafka is in no
  way possible to keep up with this.


I searched for others facing the same issue but
  can't find anything similar. I'm hoping that someone
  here could guide me in the right direction.


Thanks in advance


  

  



-- 

  

  
  

Konstantin Knauf | Solutions Architect
+49 160 91394525
  
  
  
  Follow us @VervericaData
  --
  Join Flink Forward - The Apache Flink Conference
  Stream Processing | Event Driven | Real Time
  --
  Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
  --
  Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen    
  


  

  

  



Re: Throttling/effective back-pressure on a Kafka sink

2019-03-28 Thread Konstantin Knauf
Hi Marc,

the Kafka Producer should be able to create backpressure. Could you try to
increase max.block.ms to Long.MAX_VALUE?

The exceptions you shared for the failure case don't look like the root
causes of the problem. Could you share the full stacktraces or even full
logs for this time frame. Feel free to send these logs to me directly, if
you don't want to share them on the list.

Best,

Konstantin




On Thu, Mar 28, 2019 at 2:04 PM Marc Rooding  wrote:

> Hi
>
> We’ve got a job producing to a Kafka sink. The Kafka topics have a
> retention of 2 weeks. When doing a complete replay, it seems like Flink
> isn’t able to back-pressure or throttle the amount of messages going to
> Kafka, causing the following error:
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
> send data to Kafka: Expiring 8396 record(s) for topic-1:12 ms has
> passed since batch creation
>
> We’re running on Flink 1.7.2 with flink-connector-kafka:1.7.2. Our Kafka
> cluster is running version 2.1.1. The Kafka producer uses all default
> settings except from:
>
> compression.type = snappy
> max.in.flight.requests.per.connection = 1
> acks = all
> client.dns.lookup = use_all_dns_ips
>
> I tried playing around with the buffer and batch settings, increasing
> timeouts, but none seem to be what we need. Increasing the
> delivery.timeout.ms and request.timeout.ms solves the initial error, but
> causes the Flink job to fail entirely due to:
>
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
>
> My assumption is that the Kafka producer will start blocking since it
> notices that it can't handle the batches, and Flink eventually runs out of
> buffers for the operator.
>
> What really baffles me is that the backpressure tab shows that everything
> is OK. The entire job pipeline (which reads from 4 different topics, unions
> them all and sinks towards 1 topic) pushes all the messages through to the
> sink stage, resulting in 18 million incoming stage messages, even though
> Kafka is in no way possible to keep up with this.
>
> I searched for others facing the same issue but can't find anything
> similar. I'm hoping that someone here could guide me in the right direction.
>
> Thanks in advance
>
>

-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen