Thanks Roman. I had already tried disableChaining, it didn’t have any effect.
The built in JDBC sink is really slow compared to a bulk load(close to 100x),
but I had tested that and saw the same issue. When a given message triggers
the JDBC sink to write a batch, everything else waits for it.
From: Roman Khachatryan
Date: Thursday, April 29, 2021 at 2:49 PM
To: Kurtis Walker
Cc: user@flink.apache.org
Subject: Re: Backpressure configuration
EXTERNAL EMAIL
Hello Kurt,
Assuming that your sink is blocking, I would first make sure that it
is not chained with the preceding operators. Otherwise, the same
thread will output data and perform windowing/triggering.
You can add disableChaining after addSink to prevent this [1].
Besides that, you probably could use existing JDBC batching
functionality by configuring JdbcExecutionOptions [2] and providing it
to JdbcSink.sink() [3].
[1]
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/*task-chaining-and-resource-groups__;Iw!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101s4Xc0Ef$<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/*task-chaining-and-resource-groups__;Iw!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101s4Xc0Ef$>
[2]
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.Builder.html*withBatchSize-int-__;Iw!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101mcyPBdT$<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.Builder.html*withBatchSize-int-__;Iw!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101mcyPBdT$>
[3]
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/jdbc.html__;!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101gZKXATg$<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/jdbc.html__;!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101gZKXATg$>
Regards,
Roman
On Thu, Apr 29, 2021 at 6:12 PM Kurtis Walker
wrote:
>
> Hello,
>
> I’m building a POC for a Flink app that loads data from Kafka in to a
> Greenplum data warehouse. I’ve built a custom sink operation that will bulk
> load a number of rows. I’m using a global window, triggering on number of
> records, to collect the rows for each sink. I’m finding that while the sink
> is running, the previous operations of my app stop processing messages until
> the sink operation completes. I guess this is the backpressure logic kicking
> in. The cost being I get about 60% of the throughput that is theoretically
> possible. Is there any configuration that would let me control that
> backpressure so that Flink will buffer rows when it encounters backpressure?
> In the ideal world when a sink operation completes, the next batch of rows is
> ready for the sink to pick up immediately. Here’s my code:
>
>
>
> env.addSource(new FlinkKafkaConsumer011<>(CXP_MARKET_EXAMPLE, new
> SugarDeserializer(), localKafkaProperties))
>
> .keyBy((KeySelector) value ->
> value.getPayload().getSource().getTable())
>
> .window(GlobalWindows.create())
>
>
> .trigger(PurgingTrigger.of(ProcessingTimeoutTrigger.of(CountTrigger.of(5000),
> Duration.of(1, ChronoUnit.MINUTES
>
> .aggregate(new ListAggregator())
>
> .addSink(new CopySink(new
> SimpleJdbcConnectionProvider(localConnectionOptions))).name("copysink");
>
>
>
>
>
> Thanks!
>
>
>
> Kurt
>
>