Jun, The problem with it not being 1-1 is that Connect relies heavily on
offsets, so we'd need to be able to track offsets at this finer
granularity. Filtering is ok, but flatMap isn't. If you convert one message
to many, what are the offsets for the new messages? One possibility would
be to assume that transformations are deterministic and then "enhance" the
offsets with an extra integer field that indicates its position in the
subset. For sources this seems attractive since you can then reset to
whatever the connector-provided offset is and then filter out any of the
"sub"-messages that are earlier than the recorded "sub"-offset. But this
might not actually work for sources since a) the offsets will include extra
fields that the connector doesn't expect (might be ok since we handle that
data as schemaless anyway) and b) if we allow multiple transformations
(which seems likely given that people might want to do things like
rearrange fields + filter messages) then offsets start getting quite
complex as we add sub-sub-offsets and sub-sub-sub-offsets. It's doable, but
seems messy.

Things aren't as easy on the sink side. Since we track offsets using Kafka
offsets we either need to use the extra metadata space to store the
sub-offsets or we need to ensure that we only ever need to commit offsets
on Kafka message boundaries. We might be able to get away with just
delivering the entire set of generated messages in a single put() call,
which the connector is expected to either fully accept or fully reject (via
exception). However, this may end up interacting poorly with assumptions
connectors might make if we expose things like max.poll.records, where they
might expect one record at a time.

I'm not really convinced of the benefit of support this -- at some point it
seems better to use Streams to do transformations if you need flatMap. I
can't think of many generic transformations that would use 1-to-many, and
single message transforms really should be quite general -- that's the
reason for providing a separate interface isolated from Connectors or
Converters.

Gwen, re: using null and sending to dead letter queue, it would be useful
to think about how this might interact with other uses of a dead letter
queue. Similar ideas have been raised for messages that either can't be
parsed or which the connector chokes on repeatedly. If we use a dead letter
queue for those, do we want these messages (which are explicitly filtered
by a transform setup by the user) to end up in the same location?

-Ewen

On Sun, Jul 17, 2016 at 9:53 PM, Jun Rao <j...@confluent.io> wrote:

> Does the transformation need to be 1-to-1? For example, some users model
> each Kafka message as schema + a batch of binary records. When using a sink
> connector to push the Kafka data to a sink, if would be useful if the
> transformer can convert each Kafka message to multiple records.
>
> Thanks,
>
> Jun
>
> On Sat, Jul 16, 2016 at 1:25 PM, Nisarg Shah <snis...@gmail.com> wrote:
>
> > Gwen,
> >
> > Yup, that sounds great! Instead of keeping it up to the transformers to
> > handle null, we can instead have the topic as null. Sounds good. To get
> rid
> > of a message, set the topic to a special one (could be as simple as
> null).
> >
> > Like I said before, the more interesting part would be ‘adding’ a new
> > message to the existing list, based on say the current message in the
> > transformer. Does that feature warrant to be included?
> >
> > > On Jul 14, 2016, at 22:25, Gwen Shapira <g...@confluent.io> wrote:
> > >
> > > I used to work on Apache Flume, where we used to allow users to filter
> > > messages completely in the transformation and then we got rid of it,
> > > because we spent too much time trying to help users who had "message
> > > loss", where the loss was actually a bug in the filter...
> > >
> > > What we couldn't do in Flume, but perhaps can do in the simple
> > > transform for Connect is the ability to route messages to different
> > > topics, with "null" as one of the possible targets. This will allow
> > > you to implement a dead-letter-queue functionality and redirect
> > > messages that don't pass filter to an "errors" topic without getting
> > > rid of them completely, while also allowing braver users to get rid of
> > > messages by directing them to "null".
> > >
> > > Does that make sense?
> > >
> > > Gwen
> > >
> > > On Thu, Jul 14, 2016 at 8:33 PM, Nisarg Shah <snis...@gmail.com>
> wrote:
> > >> Thank you for your inputs Gwen and Michael.
> > >>
> > >> The original reason why I suggested a set based processing is because
> > of the flexibility is provides. The JIRA had a comment by a user
> requesting
> > a feature that could be achieved with this.
> > >>
> > >> After reading Gwen and Michael's points, (I went through the
> > documentation and the code in detail) and agree with what you have to
> say.
> > Also, fewer guarantees make what I had in mind less certain and thus
> > simplifying it to a single message based transformation would ensure that
> > users who do require more flexibility with the transformations will
> > automatically “turn to" Kafka Streams. The transformation logic on a
> > message by message basis makes more sense.
> > >>
> > >> One usecase that Kafka Connect could consider is adding or removing a
> > message completely. (This was trivially possible with the collection
> > passing). Should users be pointed towards Kafka Streams even for this use
> > case? I think this is a very useful feature for Connect too, and I’ll try
> > to rethink on the API too.
> > >>
> > >> Removing a message is as easy as returning a null and having the next
> > transformer skip it, but adding messages would involve say a queue
> between
> > transformers and a method which says “pass message” to the next, which
> can
> > be called multiple times from one “transform” function; a variation on
> the
> > chain of responsibility design pattern.
> > >>
> > >>> On Jul 12, 2016, at 12:54 AM, Michael Noll <mich...@confluent.io>
> > wrote:
> > >>>
> > >>> As Gwen said, my initial thought is that message transformations that
> > are
> > >>> "more than trivial" should rather be done by Kafka Streams, rather
> > than by
> > >>> Kafka Connect (for the reasons that Gwen mentioned).
> > >>>
> > >>> Transforming one message at a time would be a good fit for Kafka
> > Connect.
> > >>> An important use case is to remove sensitive data (such as PII) from
> an
> > >>> incoming data stream before it hits Kafka's persistent storage --
> this
> > use
> > >>> case can't be implemented well with Kafka Streams because, by design,
> > Kafka
> > >>> Streams is meant to read its input data from Kafka (i.e. at the point
> > when
> > >>> Kafka Streams could be used to removed sensitive data fields the data
> > is
> > >>> already stored persistently in Kafka, and this might be a no-go
> > depending
> > >>> on the use case).
> > >>>
> > >>> I'm of course interested to hear what other people think.
> > >>>
> > >>>
> > >>> On Tue, Jul 12, 2016 at 6:06 AM, Gwen Shapira <g...@confluent.io>
> > wrote:
> > >>>
> > >>>> I think we need to restrict the functionality to
> > one-message-at-a-time.
> > >>>>
> > >>>> Basically, connect gives very little guarantees about the size of
> the
> > set
> > >>>> of the composition (you may get same messages over and over, mix of
> > old and
> > >>>> new, etc)
> > >>>>
> > >>>> In order to do useful things over a collection, you need better
> > defined
> > >>>> semantics of what's included. Kafka Streams is putting tons of
> effort
> > into
> > >>>> having good windowing semantics, and I think apps that require
> > modification
> > >>>> of collections are a better fit there.
> > >>>>
> > >>>> I'm willing to change my mind though (have been known to happen) -
> > what are
> > >>>> the comments about usage that point toward the collections approach?
> > >>>>
> > >>>> Gwen
> > >>>>
> > >>>> On Mon, Jul 11, 2016 at 3:32 PM, Nisarg Shah <snis...@gmail.com>
> > wrote:
> > >>>>
> > >>>>> Thanks Jay, added that to the KIP.
> > >>>>>
> > >>>>> Besides reviewing the KIP as a whole, I wanted to know about what
> > >>>> everyone
> > >>>>> thinks about what data should be dealt at the Transformer level.
> > >>>> Transform
> > >>>>> the whole Collection of Records (giving the flexibility of
> modifying
> > >>>>> messages across the set) OR
> > >>>>> Transform messages one at a time, iteratively. This will restrict
> > >>>>> modifications across messages.
> > >>>>>
> > >>>>> I’ll get a working sample ready soon, to have a look. There were
> some
> > >>>>> comments about Transformer usage that pointed to the first
> approach,
> > >>>> which
> > >>>>> I prefer too given the flexibility.
> > >>>>>
> > >>>>>> On Jul 11, 2016, at 2:49 PM, Jay Kreps <j...@confluent.io> wrote:
> > >>>>>>
> > >>>>>> One minor thing, the Transformer interface probably needs a
> close()
> > >>>>> method
> > >>>>>> (i.e. the opposite of initialize). This would be used for any
> > >>>> transformer
> > >>>>>> that uses a resource like a file/socket/db connection/etc that
> > needs to
> > >>>>> be
> > >>>>>> closed. You usually don't need this but when you do need it you
> > really
> > >>>>> need
> > >>>>>> it.
> > >>>>>>
> > >>>>>> -Jay
> > >>>>>>
> > >>>>>> On Mon, Jul 11, 2016 at 1:47 PM, Nisarg Shah <snis...@gmail.com>
> > >>>> wrote:
> > >>>>>>
> > >>>>>>> Hello,
> > >>>>>>>
> > >>>>>>> This KIP <
> > >>>>>>>
> > >>>>>
> > >>>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-66:+Add+Kafka+Connect+Transformers+to+allow+transformations+to+messages
> > >>>>>>
> > >>>>>>> is for KAFKA-3209 <
> > https://issues.apache.org/jira/browse/KAFKA-3209>.
> > >>>>>>> It’s about capabilities to transform messages in Kafka Connect.
> > >>>>>>>
> > >>>>>>> Some design decisions need to be taken, so please advise me on
> the
> > >>>> same.
> > >>>>>>> Feel free to express any thoughts or concerns as well.
> > >>>>>>>
> > >>>>>>> Many many thanks to Ewen Cheslack-Postava.
> > >>>>>>>
> > >>>>>>> -Nisarg
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>>
> > >>>
> > >>> --
> > >>> Best regards,
> > >>> Michael Noll
> > >>>
> > >>>
> > >>>
> > >>> *Michael G. Noll | Product Manager | Confluent | +1
> > 650.453.5860Download
> > >>> Apache Kafka and Confluent Platform: www.confluent.io/download
> > >>> <http://www.confluent.io/download>*
> > >>
> >
> >
>



-- 
Thanks,
Ewen

Reply via email to