Hello,
  I’m building a POC for a Flink app that loads data from Kafka in to a 
Greenplum data warehouse.  I’ve built a custom sink operation that will bulk 
load a number of rows.  I’m using a global window, triggering on number of 
records, to collect the rows for each sink.  I’m finding that while the sink is 
running, the previous operations of my app stop processing messages until the 
sink operation completes.  I guess this is the backpressure logic kicking in.  
The cost being I get about 60% of the throughput that is theoretically 
possible.  Is there any configuration that would let me control that 
backpressure so that Flink will buffer rows when it encounters backpressure?  
In the ideal world when a sink operation completes, the next batch of rows is 
ready for the sink to pick up immediately.  Here’s my code:

        env.addSource(new FlinkKafkaConsumer011<>(CXP_MARKET_EXAMPLE, new 
SugarDeserializer(), localKafkaProperties))
                .keyBy((KeySelector<Envelope, String>) value -> 
value.getPayload().getSource().getTable())
                .window(GlobalWindows.create())
                
.trigger(PurgingTrigger.of(ProcessingTimeoutTrigger.of(CountTrigger.of(5000), 
Duration.of(1, ChronoUnit.MINUTES))))
                .aggregate(new ListAggregator())
                .addSink(new CopySink(new 
SimpleJdbcConnectionProvider(localConnectionOptions))).name("copysink");


Thanks!

Kurt

Reply via email to