Hi Rion, We're using plain Kafka producer to send records to DLQ. Regards, Maciek
wt., 3 sie 2021 o 18:07 Rion Williams <rionmons...@gmail.com> napisał(a): > > Thanks Maciek, > > It looks like my initial issue had another problem with a bad interface that > was being used (or an improper one), but after changing that and ensuring all > of the fields were implemented it worked as expected. > > I know in my particular case, I'm planning on writing to Kafka, however my > wrapped function isn't a process function and thus it isn't as simple as > supplying a side-output and sending those to Kafka. I'm guessing in this > scenario, it'd be sufficient to have a plain Kafka producer (created within > the open() function) and just use that as opposed to constructing a sink and > explicitly invoking it. > > ``` > catch (exception: Exception) { > // I'd imagine that the context here would require a second level of > mapping to ensure that > // we have the proper context for the sink itself > dlqSink.invoke(value, context) > > // Or this would be the alternative > dlqProducer.send(..., value) > } > ``` > > I don't know if you have the same scenario (or are leveraging Kafka), but if > so, I'd be curious of the approach that you took. > > Thanks much, > > Rion > > On 2021/08/03 08:45:07, Maciej Obuchowski <obuchowski.mac...@gmail.com> wrote: > > Hey. > > > > As far as I see, you're not overriding functions like open, > > setRuntimeContext, snapshotState, initializeState - the calls needs to > > be passed to the inner sink function. > > > > pon., 2 sie 2021 o 19:31 Rion Williams <rionmons...@gmail.com> napisał(a): > > > > > > Hi again Maciek (and all), > > > > > > I just recently returned to start investigating this approach, however I > > > can't seem to get the underlying invocation to work as I would normally > > > expect. I'll try to share a bit more as what I currently have and perhaps > > > I'm just missing something minor that someone may be able to spot. > > > > > > To reiterate - what I'm attempting to do is take a stream of events > > > flowing through, specific types of entities are extracted from these > > > events into multiple side-outputs, and these side-outputs are passed to a > > > sync that will write them via JDBC using logic specific to that entity. > > > What I am aiming to achieve is being able to capture a single record that > > > may be problematic and avoid a poison pill to throw onto a dead-letter > > > queue (Kafka). I understand this would mean limiting batching sizes to a > > > single record, however I'm assuming that the connections themselves could > > > be pooled possibly to avoid opening up a new connection per call. If this > > > isn't the case, is there a way to handle that (or would I need to > > > implement my own sync). > > > > > > ``` > > > val users = Tags.users > > > parsedChangelogs > > > .getSideOutput(users) > > > .addSink(PostgresSink.fromEntityType(users.typeInfo, > > > parameters)) > > > .uid("sync-${users.id}-to-postgres") > > > .name("sync-${users.id}-to-postgres") > > > > > > val addresses = Tags.addresses > > > parsedChangelogs > > > .getSideOutput(addresses) > > > .addSink(PostgresSink.fromEntityType(addresses.typeInfo, > > > parameters)) > > > .uid("sync-${addresses.id}-to-postgres") > > > .name("sync-${addresses.id}-to-postgres") > > > ``` > > > > > > And the dynamic sink (that would associate a given entity to the > > > necessary calls made to the database) looks a bit like this: > > > > > > ``` > > > fun <T: Any> fromEntityType(typeInfo: TypeInformation<T>, parameters: > > > ParameterTool): SinkFunction<T> { > > > val metadata = getQueryMetadataFromType(typeInfo) > > > > > > return JdbcSink > > > .sink( > > > metadata.query, > > > metadata.statement, > > > getJdbcExecutionOptions(parameters), > > > JdbcConnectionOptions.JdbcConnectionOptionsBuilder() > > > .withDriverName("org.postgresql.Driver") > > > .withUrl(buildConnectionString(parameters)) > > > .build(), > > > ) > > > } > > > ``` > > > > > > I've tried several, a naive wrapper approach that I attempted looked > > > something like this: > > > > > > ``` > > > class DlqWrapper<T>(private val sink: SinkFunction<T>, val parameters: > > > ParameterTool): SinkFunction<T> { > > > private val logger = LoggerFactory.getLogger(DlqSink::class.java) > > > private val dlqSink: SinkFunction<String> = ... > > > > > > override fun invoke(value: T, context: SinkFunction.Context) { > > > try { > > > sink.invoke(value, context) > > > } > > > catch (ex: Exception) { > > > logger.error("Encountered sink exception. Sending message to > > > dead letter queue. Value: $value. Exception: ${ex.message}") > > > val payload = Gson().toJsonTree(value).asJsonObject > > > payload.addProperty("exception", ex.message) > > > > > > dlqSink.invoke("$payload", context) > > > } > > > } > > > } > > > ``` > > > > > > After doing this, it doesn't look like when the invoke calls are made > > > that it's actually attempting to perform the JDBC calls to insert the > > > records into those sources. I'm not entirely sure if this is related > > > specifically for how the JdbcSink is wrapped (via the GenericJdbcSink, > > > etc.). > > > > > > I had seen several posts around involving the use of an > > > InvocationHandler/Proxy, etc. but I'm not sure if that should be > > > necessary for handling this type of functionality. Any > > > ideas/thoughts/examples would be greatly appreciated. > > > > > > Thanks, > > > > > > Rion > > > > > > On 2021/07/14 15:47:18, Maciej Bryński <mac...@brynski.pl> wrote: > > > > This is the idea. > > > > Of course you need to wrap more functions like: open, close, > > > > notifyCheckpointComplete, snapshotState, initializeState and > > > > setRuntimeContext. > > > > > > > > The problem is that if you want to catch problematic record you need > > > > to set batch size to 1, which gives very bad performance. > > > > > > > > Regards, > > > > Maciek > > > > > > > > śr., 14 lip 2021 o 17:31 Rion Williams <rionmons...@gmail.com> > > > > napisał(a): > > > > > > > > > > Hi Maciej, > > > > > > > > > > Thanks for the quick response. I wasn't aware of the idea of using a > > > > > SinkWrapper, but I'm not quite certain that it would suit this > > > > > specific use case (as a SinkFunction / RichSinkFunction doesn't > > > > > appear to support side-outputs). Essentially, what I'd hope to > > > > > accomplish would be to pick up when a bad record could not be written > > > > > to the sink and then offload that via a side-output somewhere else. > > > > > > > > > > Something like this, which is a very, very naive idea: > > > > > > > > > > class PostgresSinkWrapper<T>(private val sink: SinkFunction<T>): > > > > > RichSinkFunction<T>() { > > > > > private val logger = > > > > > LoggerFactory.getLogger(PostgresSinkWrapper::class.java) > > > > > > > > > > override fun invoke(value: T, context: SinkFunction.Context) { > > > > > try { > > > > > sink.invoke(value, context) > > > > > } > > > > > catch (exception: Exception){ > > > > > logger.error("Encountered a bad record, offloading to > > > > > dead-letter-queue") > > > > > // Offload bad record to DLQ > > > > > } > > > > > } > > > > > } > > > > > > > > > > But I think that's basically the gist of it. I'm just not sure how I > > > > > could go about doing this aside from perhaps writing a custom process > > > > > function that wraps another sink function (or just completely > > > > > rewriting my own JdbcSink?) > > > > > > > > > > Thanks, > > > > > > > > > > Rion > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jul 14, 2021 at 9:56 AM Maciej Bryński <mac...@brynski.pl> > > > > > wrote: > > > > >> > > > > >> Hi Rion, > > > > >> We have implemented such a solution with Sink Wrapper. > > > > >> > > > > >> > > > > >> Regards, > > > > >> Maciek > > > > >> > > > > >> śr., 14 lip 2021 o 16:21 Rion Williams <rionmons...@gmail.com> > > > > >> napisał(a): > > > > >> > > > > > >> > Hi all, > > > > >> > > > > > >> > Recently I've been encountering an issue where some external > > > > >> > dependencies or process causes writes within my JDBCSink to fail > > > > >> > (e.g. something is being inserted with an explicit constraint that > > > > >> > never made it's way there). I'm trying to see if there's a pattern > > > > >> > or recommendation for handling this similar to a dead-letter queue. > > > > >> > > > > > >> > Basically - if I experience a given number of failures (> max > > > > >> > retry attempts) when writing to my JDBC destination, I'd like to > > > > >> > take the record that was attempted and throw it into a Kafka topic > > > > >> > or some other destination so that it can be evaluated at a later > > > > >> > time. > > > > >> > > > > > >> > Are there any well defined patterns or recommended approaches > > > > >> > around this? > > > > >> > > > > > >> > Thanks, > > > > >> > > > > > >> > Rion > > > > >> > > > > >> > > > > >> > > > > >> -- > > > > >> Maciek Bryński > > > > > > > > > > > > > > > > -- > > > > Maciek Bryński > > > > > > -- Maciek Bryński