Hey all,

I've been working with JdbcSink and it's really made my life much easier,
but I had two questions about it that folks might be able to answer or
provide some clarity around.

*Accessing Statement Execution / Results*

Is there any mechanism in place (or out of the box) to support reading the
results of statements executed by the JdbcSink or would I need to implement
my own to support it?

The problem that I'm trying to solve relates to observability (i.e.
metrics) and incrementing specific counters based on the response from a
given statement executing. One example might be if I need to upsert 40
widgets that are coming in, although some may be the same widget, I only
want to increment my metric if the widget didn't already exist, which I
could get via the response from the underlying queries.

*Batching Mechanisms (withBatchIntervalMs & withBatchSize)*

This was another great feature that I was happy to see since I didn't want
to handle writing my own windowing logic for something as trivial as this.
I noticed some odd behaviors when I attempted to implement this being
driven by configuration:

private fun getJdbcExecutionOptions(parameters: ParameterTool):
JdbcExecutionOptions {
    var executionOptions = JdbcExecutionOptions.builder()
    if (parameters.getBoolean("database.batching.enabled", false)){
        if (parameters.has("database.batching.ms")){
            val batchingIntervalMs = parameters.getLong("database.batching.ms")
            executionOptions = executionOptions
                .withBatchIntervalMs(batchingIntervalMs)
        }

        if (parameters.has("database.batching.records")){
            val batchingRecords = parameters.getInt("database.batching.records")
            executionOptions = executionOptions
                .withBatchSize(batchingRecords)
        }
    }

    return executionOptions.build()
}

With the settings of 60000 (batchIntervalMs) and 100 (batchSize), it was
around 7-8 minutes prior to a write to the destination taking place,
however when previously just using the batchIntervalMs configuration, I'd
see it consistently write out one a minute.

I was looking through the source and it seems the time-based emissions are
scheduled asynchronously. I may have missed something, but I didn't
explicitly see something where a records-based emission would affect the
scheduled emission.

I'm just trying to get confirmation if these work together as an OR
operation (i.e. flush the pending records once a given number of records
have been seen or once a time interval has elasped).

Thanks so much, you folks have been an incredible community in my short
time here and I've enjoyed working with Flink, contributing, and I hope to
continue to do much more!

Rion

Reply via email to