Hi Rion,
We're using plain Kafka producer to send records to DLQ.

Regards,
Maciek

wt., 3 sie 2021 o 18:07 Rion Williams <rionmons...@gmail.com> napisał(a):
>
> Thanks Maciek,
>
> It looks like my initial issue had another problem with a bad interface that 
> was being used (or an improper one), but after changing that and ensuring all 
> of the fields were implemented it worked as expected.
>
> I know in my particular case, I'm planning on writing to Kafka, however my 
> wrapped function isn't a process function and thus it isn't as simple as 
> supplying a side-output and sending those to Kafka. I'm guessing in this 
> scenario, it'd be sufficient to have a plain Kafka producer (created within 
> the open() function) and just use that as opposed to constructing a sink and 
> explicitly invoking it.
>
> ```
> catch (exception: Exception) {
>      // I'd imagine that the context here would require a second level of 
> mapping to ensure that
>      // we have the proper context for the sink itself
>      dlqSink.invoke(value, context)
>
>      // Or this would be the alternative
>      dlqProducer.send(..., value)
> }
> ```
>
> I don't know if you have the same scenario (or are leveraging Kafka), but if 
> so, I'd be curious of the approach that you took.
>
> Thanks much,
>
> Rion
>
> On 2021/08/03 08:45:07, Maciej Obuchowski <obuchowski.mac...@gmail.com> wrote:
> > Hey.
> >
> > As far as I see, you're not overriding functions like open,
> > setRuntimeContext, snapshotState, initializeState - the calls needs to
> > be passed to the inner sink function.
> >
> > pon., 2 sie 2021 o 19:31 Rion Williams <rionmons...@gmail.com> napisał(a):
> > >
> > > Hi again Maciek (and all),
> > >
> > > I just recently returned to start investigating this approach, however I 
> > > can't seem to get the underlying invocation to work as I would normally 
> > > expect. I'll try to share a bit more as what I currently have and perhaps 
> > > I'm just missing something minor that someone may be able to spot.
> > >
> > > To reiterate - what I'm attempting to do is take a stream of events 
> > > flowing through, specific types of entities are extracted from these 
> > > events into multiple side-outputs, and these side-outputs are passed to a 
> > > sync that will write them via JDBC using logic specific to that entity. 
> > > What I am aiming to achieve is being able to capture a single record that 
> > > may be problematic and avoid a poison pill to throw onto a dead-letter 
> > > queue (Kafka). I understand this would mean limiting batching sizes to a 
> > > single record, however I'm assuming that the connections themselves could 
> > > be pooled possibly to avoid opening up a new connection per call. If this 
> > > isn't the case, is there a way to handle that (or would I need to 
> > > implement my own sync).
> > >
> > > ```
> > > val users = Tags.users
> > >         parsedChangelogs
> > >             .getSideOutput(users)
> > >             .addSink(PostgresSink.fromEntityType(users.typeInfo, 
> > > parameters))
> > >             .uid("sync-${users.id}-to-postgres")
> > >             .name("sync-${users.id}-to-postgres")
> > >
> > > val addresses = Tags.addresses
> > >         parsedChangelogs
> > >             .getSideOutput(addresses)
> > >             .addSink(PostgresSink.fromEntityType(addresses.typeInfo, 
> > > parameters))
> > >             .uid("sync-${addresses.id}-to-postgres")
> > >             .name("sync-${addresses.id}-to-postgres")
> > > ```
> > >
> > > And the dynamic sink (that would associate a given entity to the 
> > > necessary calls made to the database) looks a bit like this:
> > >
> > > ```
> > > fun <T: Any> fromEntityType(typeInfo: TypeInformation<T>, parameters: 
> > > ParameterTool): SinkFunction<T> {
> > >         val metadata = getQueryMetadataFromType(typeInfo)
> > >
> > >         return JdbcSink
> > >             .sink(
> > >                 metadata.query,
> > >                 metadata.statement,
> > >                 getJdbcExecutionOptions(parameters),
> > >                 JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
> > >                     .withDriverName("org.postgresql.Driver")
> > >                     .withUrl(buildConnectionString(parameters))
> > >                     .build(),
> > >             )
> > >     }
> > > ```
> > >
> > > I've tried several, a naive wrapper approach that I attempted looked 
> > > something like this:
> > >
> > > ```
> > > class DlqWrapper<T>(private val sink: SinkFunction<T>, val parameters: 
> > > ParameterTool): SinkFunction<T> {
> > >     private val logger = LoggerFactory.getLogger(DlqSink::class.java)
> > >     private val dlqSink: SinkFunction<String> = ...
> > >
> > >     override fun invoke(value: T, context: SinkFunction.Context) {
> > >         try {
> > >             sink.invoke(value, context)
> > >         }
> > >         catch (ex: Exception) {
> > >             logger.error("Encountered sink exception. Sending message to 
> > > dead letter queue. Value: $value. Exception: ${ex.message}")
> > >             val payload = Gson().toJsonTree(value).asJsonObject
> > >             payload.addProperty("exception", ex.message)
> > >
> > >             dlqSink.invoke("$payload", context)
> > >         }
> > >     }
> > > }
> > > ```
> > >
> > > After doing this, it doesn't look like when the invoke calls are made 
> > > that it's actually attempting to perform the JDBC calls to insert the 
> > > records into those sources. I'm not entirely sure if this is related 
> > > specifically for how the JdbcSink is wrapped (via the GenericJdbcSink, 
> > > etc.).
> > >
> > > I had seen several posts around involving the use of an 
> > > InvocationHandler/Proxy, etc. but I'm not sure if that should be 
> > > necessary for handling this type of functionality. Any 
> > > ideas/thoughts/examples would be greatly appreciated.
> > >
> > > Thanks,
> > >
> > > Rion
> > >
> > > On 2021/07/14 15:47:18, Maciej Bryński <mac...@brynski.pl> wrote:
> > > > This is the idea.
> > > > Of course you need to wrap more functions like: open, close,
> > > > notifyCheckpointComplete, snapshotState, initializeState and
> > > > setRuntimeContext.
> > > >
> > > > The problem is that if you want to catch problematic record you need
> > > > to set batch size to 1, which gives very bad performance.
> > > >
> > > > Regards,
> > > > Maciek
> > > >
> > > > śr., 14 lip 2021 o 17:31 Rion Williams <rionmons...@gmail.com> 
> > > > napisał(a):
> > > > >
> > > > > Hi Maciej,
> > > > >
> > > > > Thanks for the quick response. I wasn't aware of the idea of using a 
> > > > > SinkWrapper, but I'm not quite certain that it would suit this 
> > > > > specific use case (as a SinkFunction / RichSinkFunction doesn't 
> > > > > appear to support side-outputs). Essentially, what I'd hope to 
> > > > > accomplish would be to pick up when a bad record could not be written 
> > > > > to the sink and then offload that via a side-output somewhere else.
> > > > >
> > > > > Something like this, which is a very, very naive idea:
> > > > >
> > > > > class PostgresSinkWrapper<T>(private val sink: SinkFunction<T>): 
> > > > > RichSinkFunction<T>() {
> > > > >     private val logger = 
> > > > > LoggerFactory.getLogger(PostgresSinkWrapper::class.java)
> > > > >
> > > > >     override fun invoke(value: T, context: SinkFunction.Context) {
> > > > >         try {
> > > > >             sink.invoke(value, context)
> > > > >         }
> > > > >         catch (exception: Exception){
> > > > >             logger.error("Encountered a bad record, offloading to 
> > > > > dead-letter-queue")
> > > > >             // Offload bad record to DLQ
> > > > >         }
> > > > >     }
> > > > > }
> > > > >
> > > > > But I think that's basically the gist of it. I'm just not sure how I 
> > > > > could go about doing this aside from perhaps writing a custom process 
> > > > > function that wraps another sink function (or just completely 
> > > > > rewriting my own JdbcSink?)
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Rion
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Jul 14, 2021 at 9:56 AM Maciej Bryński <mac...@brynski.pl> 
> > > > > wrote:
> > > > >>
> > > > >> Hi Rion,
> > > > >> We have implemented such a solution with Sink Wrapper.
> > > > >>
> > > > >>
> > > > >> Regards,
> > > > >> Maciek
> > > > >>
> > > > >> śr., 14 lip 2021 o 16:21 Rion Williams <rionmons...@gmail.com> 
> > > > >> napisał(a):
> > > > >> >
> > > > >> > Hi all,
> > > > >> >
> > > > >> > Recently I've been encountering an issue where some external 
> > > > >> > dependencies or process causes writes within my JDBCSink to fail 
> > > > >> > (e.g. something is being inserted with an explicit constraint that 
> > > > >> > never made it's way there). I'm trying to see if there's a pattern 
> > > > >> > or recommendation for handling this similar to a dead-letter queue.
> > > > >> >
> > > > >> > Basically - if I experience a given number of failures (> max 
> > > > >> > retry attempts) when writing to my JDBC destination, I'd like to 
> > > > >> > take the record that was attempted and throw it into a Kafka topic 
> > > > >> > or some other destination so that it can be evaluated at a later 
> > > > >> > time.
> > > > >> >
> > > > >> > Are there any well defined patterns or recommended approaches 
> > > > >> > around this?
> > > > >> >
> > > > >> > Thanks,
> > > > >> >
> > > > >> > Rion
> > > > >>
> > > > >>
> > > > >>
> > > > >> --
> > > > >> Maciek Bryński
> > > >
> > > >
> > > >
> > > > --
> > > > Maciek Bryński
> > > >
> >



-- 
Maciek Bryński

Reply via email to