Hey all, I've been working with JdbcSink and it's really made my life much easier, but I had two questions about it that folks might be able to answer or provide some clarity around.
*Accessing Statement Execution / Results* Is there any mechanism in place (or out of the box) to support reading the results of statements executed by the JdbcSink or would I need to implement my own to support it? The problem that I'm trying to solve relates to observability (i.e. metrics) and incrementing specific counters based on the response from a given statement executing. One example might be if I need to upsert 40 widgets that are coming in, although some may be the same widget, I only want to increment my metric if the widget didn't already exist, which I could get via the response from the underlying queries. *Batching Mechanisms (withBatchIntervalMs & withBatchSize)* This was another great feature that I was happy to see since I didn't want to handle writing my own windowing logic for something as trivial as this. I noticed some odd behaviors when I attempted to implement this being driven by configuration: private fun getJdbcExecutionOptions(parameters: ParameterTool): JdbcExecutionOptions { var executionOptions = JdbcExecutionOptions.builder() if (parameters.getBoolean("database.batching.enabled", false)){ if (parameters.has("database.batching.ms")){ val batchingIntervalMs = parameters.getLong("database.batching.ms") executionOptions = executionOptions .withBatchIntervalMs(batchingIntervalMs) } if (parameters.has("database.batching.records")){ val batchingRecords = parameters.getInt("database.batching.records") executionOptions = executionOptions .withBatchSize(batchingRecords) } } return executionOptions.build() } With the settings of 60000 (batchIntervalMs) and 100 (batchSize), it was around 7-8 minutes prior to a write to the destination taking place, however when previously just using the batchIntervalMs configuration, I'd see it consistently write out one a minute. I was looking through the source and it seems the time-based emissions are scheduled asynchronously. I may have missed something, but I didn't explicitly see something where a records-based emission would affect the scheduled emission. I'm just trying to get confirmation if these work together as an OR operation (i.e. flush the pending records once a given number of records have been seen or once a time interval has elasped). Thanks so much, you folks have been an incredible community in my short time here and I've enjoyed working with Flink, contributing, and I hope to continue to do much more! Rion