This is the idea.
Of course you need to wrap more functions like: open, close,
notifyCheckpointComplete, snapshotState, initializeState and
setRuntimeContext.

The problem is that if you want to catch problematic record you need
to set batch size to 1, which gives very bad performance.

Regards,
Maciek

śr., 14 lip 2021 o 17:31 Rion Williams <rionmons...@gmail.com> napisał(a):
>
> Hi Maciej,
>
> Thanks for the quick response. I wasn't aware of the idea of using a 
> SinkWrapper, but I'm not quite certain that it would suit this specific use 
> case (as a SinkFunction / RichSinkFunction doesn't appear to support 
> side-outputs). Essentially, what I'd hope to accomplish would be to pick up 
> when a bad record could not be written to the sink and then offload that via 
> a side-output somewhere else.
>
> Something like this, which is a very, very naive idea:
>
> class PostgresSinkWrapper<T>(private val sink: SinkFunction<T>): 
> RichSinkFunction<T>() {
>     private val logger = 
> LoggerFactory.getLogger(PostgresSinkWrapper::class.java)
>
>     override fun invoke(value: T, context: SinkFunction.Context) {
>         try {
>             sink.invoke(value, context)
>         }
>         catch (exception: Exception){
>             logger.error("Encountered a bad record, offloading to 
> dead-letter-queue")
>             // Offload bad record to DLQ
>         }
>     }
> }
>
> But I think that's basically the gist of it. I'm just not sure how I could go 
> about doing this aside from perhaps writing a custom process function that 
> wraps another sink function (or just completely rewriting my own JdbcSink?)
>
> Thanks,
>
> Rion
>
>
>
>
>
> On Wed, Jul 14, 2021 at 9:56 AM Maciej Bryński <mac...@brynski.pl> wrote:
>>
>> Hi Rion,
>> We have implemented such a solution with Sink Wrapper.
>>
>>
>> Regards,
>> Maciek
>>
>> śr., 14 lip 2021 o 16:21 Rion Williams <rionmons...@gmail.com> napisał(a):
>> >
>> > Hi all,
>> >
>> > Recently I've been encountering an issue where some external dependencies 
>> > or process causes writes within my JDBCSink to fail (e.g. something is 
>> > being inserted with an explicit constraint that never made it's way 
>> > there). I'm trying to see if there's a pattern or recommendation for 
>> > handling this similar to a dead-letter queue.
>> >
>> > Basically - if I experience a given number of failures (> max retry 
>> > attempts) when writing to my JDBC destination, I'd like to take the record 
>> > that was attempted and throw it into a Kafka topic or some other 
>> > destination so that it can be evaluated at a later time.
>> >
>> > Are there any well defined patterns or recommended approaches around this?
>> >
>> > Thanks,
>> >
>> > Rion
>>
>>
>>
>> --
>> Maciek Bryński



-- 
Maciek Bryński

Reply via email to