On Thu, Jul 28, 2016 at 1:13 PM, Shikhar Bhushan <shik...@confluent.io>
wrote:

> Some thoughts on the KIP and single-message transforms in general.
>
> * When does transformation take place? In the KIP, it seems like the
> connector-implemented task is responsible for calling into the
> transformation logic. I'd propose that,
>   - for source connectors, the transformer chain operates on the
> SourceRecord's generated by a call to SourceTask.poll(), and are called by
> the connect framework after getting them from the task before writing to
> Kafka
>   - for sink connectors, the transformer chain operates on SinkRecord's and
> are called by the connect framework after generating the records from Kafka
> and before they are passed on to SinkTask.put()
>
> I think this addresses the type-related questions raised in the KIP.
> Transformers would operate on ConnectRecord's (the base class for
> SourceRecord & SinkRecord) and not have any generic type parameters.
>

Hmm, operating on ConnectRecords probably doesn't work since you need to
emit the right type of record, which might mean instantiating a new one. I
think that means we either need 2 methods, one for SourceRecord, one for
SinkRecord, or we'd need to limit what parts of the message you can modify
(e.g. you can change the key/value via something like
transformKey(ConnectRecord) and transformValue(ConnectRecord), but other
fields would remain the same and the fmwk would handle allocating new
Source/SinkRecords if needed).


>
> * There is loss of information wrt what is visible to sink connectors as
> records go through the transformation chain, unless we hang on to the
> previous states in some way, e.g. by having a field for the pre-transform
> 'parent' record. This way it is possible to get at the original record even
> if there have been a number of transformations by tracing along the
> parents. Keeping this state around does have memory usage implications.
>

Is there a use case for hanging on to the original? I can't think of a
transformation where you'd need to do that (or couldn't just order things
differently so it isn't a problem).


>
> * There is often a requirement for being able to flexibly map from source
> entity like db table, filename etc. to the Kafka topic name in source
> connectors, and Kafka topic name to the db table, hdfs dir, etc. in sink
> connectors. We currently have various different kinds of configuration
> styles being used by connectors for this purpose, e.g. a prefix property.
> It'd be good to add some semantics or clear expectations around
> transformers being able to serve this use-case, with the framework
> providing a standard transformer for this purpose that can be e.g.
> configured with regex substitutions.
>
> Perhaps transformers could just rely on overriding the topic on the record,
> which seems ugly but is semantically compatible with all the use-cases I
> imagine:
> - for source connectors the destination is indeed the topic so this is just
> fine
> - for sink connectors they're basing the destination on topic in some way,
> and will now just use the transformed topic-name as a basis
>

Yeah, this seem fine to me. In fact, maybe eventually connectors won't even
need to fill in the topic at all -- it could always be specified by a
user-determined transformation, or maybe they provide a sane default but it
is easy to override with a transformer.

That said, I do worry a bit that farming too much stuff out to transformers
can result in "programming via config", i.e. a lot of the simplicity you
get from Connect disappears in long config files. Standardization would be
nice and might just avoid this (and doesn't cost that much implementing it
in each connector), and I'd personally prefer something a bit less flexible
but consistent and easy to configure.


>
> * The above destination mapping takes care of dead-letter queues as far as
> transformers may want to care about that. For filtering / discarding of
> records -- I agree with not making the API support flatMap due to the
> complications Ewen pointed out. I think we could have
> transformer.transform() (or whatever the map function is called) return
> null (given this is Java, null seems... idiomatic, unless we're on JDK8
> soon in which case we can use Optional).
>

The other option is a sentinel, e.g. something like Transformer.DROP_RECORD.


>
> * The transformer API in the KIP takes in Map<String, String> as
> configuration properties. This is consistent with how we are configuring
> everything else in connect, but as I think about more complex transformers
> I'd imagine wanting to supply multi-line scripts as a transformation spec
> e.g. Lua <https://github.com/rollulus/kafka-streams-plumber>. I suppose
> these kinds of transformers could expect the configuration property to be a
> resource file that needs to be on the classpath. Just thinking out loud
> here. I can certainly imagine kcql
> <https://github.com/datamountaineer/kafka-connect-query-language> working
> well as a transformer without needing an external resource for the spec, as
> it is designed to be configured from within the confines of a property
> value.
>

Personally I'm skeptical of that level of flexibility in transformers --
its getting awfully complex and certainly takes us pretty far from "config
only" realtime data integration. It's not clear to me what the use cases
are that aren't covered by a small set of common transformations that can
be chained together (e.g. rename/remove fields, mask values, and maybe a
couple more).

In any case, we'd probably also have to change configs of connectors if we
allowed configs like that since presumably transformer configs will just be
part of the connector config.


-Ewen


> On Mon, Jul 25, 2016 at 1:08 AM Michael Noll <mich...@confluent.io> wrote:
>
> > API question regarding dead letter queues / sending messages to a null
> > topic in order to get rid of them:  I assume we wouldn't suggest users to
> > actually pass `null` into some method, but rather have a proper and
> > descriptive API method such as `discard()` (this name is just an
> example)?
> >
> > On Sat, Jul 23, 2016 at 11:13 PM, Ewen Cheslack-Postava <
> e...@confluent.io
> > >
> > wrote:
> >
> > > On Fri, Jul 22, 2016 at 12:58 AM, Shikhar Bhushan <
> shik...@confluent.io>
> > > wrote:
> > >
> > > > flatMap() / supporting 1->n feels nice and general since filtering is
> > > just
> > > > the case of going from 1->0
> > > >
> > > > I'm not sure why we'd need to do any more granular offset tracking
> > (like
> > > > sub-offsets) for source connectors: after transformation of a given
> > > record
> > > > to n records, all of those n should map to same offset of the source
> > > > partition. The only thing to take care of here would be that we don't
> > > > commit a source offset while there are still records with that offset
> > > that
> > > > haven't been flushed to Kafka, but this is in the control of the
> > connect
> > > > runtime.
> > > >
> > > >
> > > I'd like to be forward thinking with this and make sure we can get
> > exactly
> > > once delivery when the producer can support it. For that you need to be
> > > able to track offsets at the granularity you actually publish messages
> to
> > > Kafka (or at least I can't think of a way of making it work without
> > > tracking them at that granularity).
> > >
> > > -Ewen
> > >
> > >
> > > > I see your point for sink connectors, though. Implementors can
> > currently
> > > > assume 1:1ness of a record to its Kafka coordinates (topic,
> partition,
> > > > offset).
> > > >
> > > > On Thu, Jul 21, 2016 at 10:57 PM Ewen Cheslack-Postava <
> > > e...@confluent.io>
> > > > wrote:
> > > >
> > > > > Jun, The problem with it not being 1-1 is that Connect relies
> heavily
> > > on
> > > > > offsets, so we'd need to be able to track offsets at this finer
> > > > > granularity. Filtering is ok, but flatMap isn't. If you convert one
> > > > message
> > > > > to many, what are the offsets for the new messages? One possibility
> > > would
> > > > > be to assume that transformations are deterministic and then
> > "enhance"
> > > > the
> > > > > offsets with an extra integer field that indicates its position in
> > the
> > > > > subset. For sources this seems attractive since you can then reset
> to
> > > > > whatever the connector-provided offset is and then filter out any
> of
> > > the
> > > > > "sub"-messages that are earlier than the recorded "sub"-offset. But
> > > this
> > > > > might not actually work for sources since a) the offsets will
> include
> > > > extra
> > > > > fields that the connector doesn't expect (might be ok since we
> handle
> > > > that
> > > > > data as schemaless anyway) and b) if we allow multiple
> > transformations
> > > > > (which seems likely given that people might want to do things like
> > > > > rearrange fields + filter messages) then offsets start getting
> quite
> > > > > complex as we add sub-sub-offsets and sub-sub-sub-offsets. It's
> > doable,
> > > > but
> > > > > seems messy.
> > > > >
> > > > > Things aren't as easy on the sink side. Since we track offsets
> using
> > > > Kafka
> > > > > offsets we either need to use the extra metadata space to store the
> > > > > sub-offsets or we need to ensure that we only ever need to commit
> > > offsets
> > > > > on Kafka message boundaries. We might be able to get away with just
> > > > > delivering the entire set of generated messages in a single put()
> > call,
> > > > > which the connector is expected to either fully accept or fully
> > reject
> > > > (via
> > > > > exception). However, this may end up interacting poorly with
> > > assumptions
> > > > > connectors might make if we expose things like max.poll.records,
> > where
> > > > they
> > > > > might expect one record at a time.
> > > > >
> > > > > I'm not really convinced of the benefit of support this -- at some
> > > point
> > > > it
> > > > > seems better to use Streams to do transformations if you need
> > flatMap.
> > > I
> > > > > can't think of many generic transformations that would use
> 1-to-many,
> > > and
> > > > > single message transforms really should be quite general -- that's
> > the
> > > > > reason for providing a separate interface isolated from Connectors
> or
> > > > > Converters.
> > > > >
> > > > > Gwen, re: using null and sending to dead letter queue, it would be
> > > useful
> > > > > to think about how this might interact with other uses of a dead
> > letter
> > > > > queue. Similar ideas have been raised for messages that either
> can't
> > be
> > > > > parsed or which the connector chokes on repeatedly. If we use a
> dead
> > > > letter
> > > > > queue for those, do we want these messages (which are explicitly
> > > filtered
> > > > > by a transform setup by the user) to end up in the same location?
> > > > >
> > > > > -Ewen
> > > > >
> > > > > On Sun, Jul 17, 2016 at 9:53 PM, Jun Rao <j...@confluent.io> wrote:
> > > > >
> > > > > > Does the transformation need to be 1-to-1? For example, some
> users
> > > > model
> > > > > > each Kafka message as schema + a batch of binary records. When
> > using
> > > a
> > > > > sink
> > > > > > connector to push the Kafka data to a sink, if would be useful if
> > the
> > > > > > transformer can convert each Kafka message to multiple records.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Sat, Jul 16, 2016 at 1:25 PM, Nisarg Shah <snis...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Gwen,
> > > > > > >
> > > > > > > Yup, that sounds great! Instead of keeping it up to the
> > > transformers
> > > > to
> > > > > > > handle null, we can instead have the topic as null. Sounds
> good.
> > To
> > > > get
> > > > > > rid
> > > > > > > of a message, set the topic to a special one (could be as
> simple
> > as
> > > > > > null).
> > > > > > >
> > > > > > > Like I said before, the more interesting part would be
> ‘adding’ a
> > > new
> > > > > > > message to the existing list, based on say the current message
> in
> > > the
> > > > > > > transformer. Does that feature warrant to be included?
> > > > > > >
> > > > > > > > On Jul 14, 2016, at 22:25, Gwen Shapira <g...@confluent.io>
> > > wrote:
> > > > > > > >
> > > > > > > > I used to work on Apache Flume, where we used to allow users
> to
> > > > > filter
> > > > > > > > messages completely in the transformation and then we got rid
> > of
> > > > it,
> > > > > > > > because we spent too much time trying to help users who had
> > > > "message
> > > > > > > > loss", where the loss was actually a bug in the filter...
> > > > > > > >
> > > > > > > > What we couldn't do in Flume, but perhaps can do in the
> simple
> > > > > > > > transform for Connect is the ability to route messages to
> > > different
> > > > > > > > topics, with "null" as one of the possible targets. This will
> > > allow
> > > > > > > > you to implement a dead-letter-queue functionality and
> redirect
> > > > > > > > messages that don't pass filter to an "errors" topic without
> > > > getting
> > > > > > > > rid of them completely, while also allowing braver users to
> get
> > > rid
> > > > > of
> > > > > > > > messages by directing them to "null".
> > > > > > > >
> > > > > > > > Does that make sense?
> > > > > > > >
> > > > > > > > Gwen
> > > > > > > >
> > > > > > > > On Thu, Jul 14, 2016 at 8:33 PM, Nisarg Shah <
> > snis...@gmail.com>
> > > > > > wrote:
> > > > > > > >> Thank you for your inputs Gwen and Michael.
> > > > > > > >>
> > > > > > > >> The original reason why I suggested a set based processing
> is
> > > > > because
> > > > > > > of the flexibility is provides. The JIRA had a comment by a
> user
> > > > > > requesting
> > > > > > > a feature that could be achieved with this.
> > > > > > > >>
> > > > > > > >> After reading Gwen and Michael's points, (I went through the
> > > > > > > documentation and the code in detail) and agree with what you
> > have
> > > to
> > > > > > say.
> > > > > > > Also, fewer guarantees make what I had in mind less certain and
> > > thus
> > > > > > > simplifying it to a single message based transformation would
> > > ensure
> > > > > that
> > > > > > > users who do require more flexibility with the transformations
> > will
> > > > > > > automatically “turn to" Kafka Streams. The transformation logic
> > on
> > > a
> > > > > > > message by message basis makes more sense.
> > > > > > > >>
> > > > > > > >> One usecase that Kafka Connect could consider is adding or
> > > > removing
> > > > > a
> > > > > > > message completely. (This was trivially possible with the
> > > collection
> > > > > > > passing). Should users be pointed towards Kafka Streams even
> for
> > > this
> > > > > use
> > > > > > > case? I think this is a very useful feature for Connect too,
> and
> > > I’ll
> > > > > try
> > > > > > > to rethink on the API too.
> > > > > > > >>
> > > > > > > >> Removing a message is as easy as returning a null and having
> > the
> > > > > next
> > > > > > > transformer skip it, but adding messages would involve say a
> > queue
> > > > > > between
> > > > > > > transformers and a method which says “pass message” to the
> next,
> > > > which
> > > > > > can
> > > > > > > be called multiple times from one “transform” function; a
> > variation
> > > > on
> > > > > > the
> > > > > > > chain of responsibility design pattern.
> > > > > > > >>
> > > > > > > >>> On Jul 12, 2016, at 12:54 AM, Michael Noll <
> > > mich...@confluent.io
> > > > >
> > > > > > > wrote:
> > > > > > > >>>
> > > > > > > >>> As Gwen said, my initial thought is that message
> > > transformations
> > > > > that
> > > > > > > are
> > > > > > > >>> "more than trivial" should rather be done by Kafka Streams,
> > > > rather
> > > > > > > than by
> > > > > > > >>> Kafka Connect (for the reasons that Gwen mentioned).
> > > > > > > >>>
> > > > > > > >>> Transforming one message at a time would be a good fit for
> > > Kafka
> > > > > > > Connect.
> > > > > > > >>> An important use case is to remove sensitive data (such as
> > PII)
> > > > > from
> > > > > > an
> > > > > > > >>> incoming data stream before it hits Kafka's persistent
> > storage
> > > --
> > > > > > this
> > > > > > > use
> > > > > > > >>> case can't be implemented well with Kafka Streams because,
> by
> > > > > design,
> > > > > > > Kafka
> > > > > > > >>> Streams is meant to read its input data from Kafka (i.e. at
> > the
> > > > > point
> > > > > > > when
> > > > > > > >>> Kafka Streams could be used to removed sensitive data
> fields
> > > the
> > > > > data
> > > > > > > is
> > > > > > > >>> already stored persistently in Kafka, and this might be a
> > no-go
> > > > > > > depending
> > > > > > > >>> on the use case).
> > > > > > > >>>
> > > > > > > >>> I'm of course interested to hear what other people think.
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>> On Tue, Jul 12, 2016 at 6:06 AM, Gwen Shapira <
> > > g...@confluent.io
> > > > >
> > > > > > > wrote:
> > > > > > > >>>
> > > > > > > >>>> I think we need to restrict the functionality to
> > > > > > > one-message-at-a-time.
> > > > > > > >>>>
> > > > > > > >>>> Basically, connect gives very little guarantees about the
> > size
> > > > of
> > > > > > the
> > > > > > > set
> > > > > > > >>>> of the composition (you may get same messages over and
> over,
> > > mix
> > > > > of
> > > > > > > old and
> > > > > > > >>>> new, etc)
> > > > > > > >>>>
> > > > > > > >>>> In order to do useful things over a collection, you need
> > > better
> > > > > > > defined
> > > > > > > >>>> semantics of what's included. Kafka Streams is putting
> tons
> > of
> > > > > > effort
> > > > > > > into
> > > > > > > >>>> having good windowing semantics, and I think apps that
> > require
> > > > > > > modification
> > > > > > > >>>> of collections are a better fit there.
> > > > > > > >>>>
> > > > > > > >>>> I'm willing to change my mind though (have been known to
> > > > happen) -
> > > > > > > what are
> > > > > > > >>>> the comments about usage that point toward the collections
> > > > > approach?
> > > > > > > >>>>
> > > > > > > >>>> Gwen
> > > > > > > >>>>
> > > > > > > >>>> On Mon, Jul 11, 2016 at 3:32 PM, Nisarg Shah <
> > > snis...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >>>>
> > > > > > > >>>>> Thanks Jay, added that to the KIP.
> > > > > > > >>>>>
> > > > > > > >>>>> Besides reviewing the KIP as a whole, I wanted to know
> > about
> > > > what
> > > > > > > >>>> everyone
> > > > > > > >>>>> thinks about what data should be dealt at the Transformer
> > > > level.
> > > > > > > >>>> Transform
> > > > > > > >>>>> the whole Collection of Records (giving the flexibility
> of
> > > > > > modifying
> > > > > > > >>>>> messages across the set) OR
> > > > > > > >>>>> Transform messages one at a time, iteratively. This will
> > > > restrict
> > > > > > > >>>>> modifications across messages.
> > > > > > > >>>>>
> > > > > > > >>>>> I’ll get a working sample ready soon, to have a look.
> There
> > > > were
> > > > > > some
> > > > > > > >>>>> comments about Transformer usage that pointed to the
> first
> > > > > > approach,
> > > > > > > >>>> which
> > > > > > > >>>>> I prefer too given the flexibility.
> > > > > > > >>>>>
> > > > > > > >>>>>> On Jul 11, 2016, at 2:49 PM, Jay Kreps <
> j...@confluent.io>
> > > > > wrote:
> > > > > > > >>>>>>
> > > > > > > >>>>>> One minor thing, the Transformer interface probably
> needs
> > a
> > > > > > close()
> > > > > > > >>>>> method
> > > > > > > >>>>>> (i.e. the opposite of initialize). This would be used
> for
> > > any
> > > > > > > >>>> transformer
> > > > > > > >>>>>> that uses a resource like a file/socket/db
> connection/etc
> > > that
> > > > > > > needs to
> > > > > > > >>>>> be
> > > > > > > >>>>>> closed. You usually don't need this but when you do need
> > it
> > > > you
> > > > > > > really
> > > > > > > >>>>> need
> > > > > > > >>>>>> it.
> > > > > > > >>>>>>
> > > > > > > >>>>>> -Jay
> > > > > > > >>>>>>
> > > > > > > >>>>>> On Mon, Jul 11, 2016 at 1:47 PM, Nisarg Shah <
> > > > snis...@gmail.com
> > > > > >
> > > > > > > >>>> wrote:
> > > > > > > >>>>>>
> > > > > > > >>>>>>> Hello,
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> This KIP <
> > > > > > > >>>>>>>
> > > > > > > >>>>>
> > > > > > > >>>>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-66:+Add+Kafka+Connect+Transformers+to+allow+transformations+to+messages
> > > > > > > >>>>>>
> > > > > > > >>>>>>> is for KAFKA-3209 <
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-3209>.
> > > > > > > >>>>>>> It’s about capabilities to transform messages in Kafka
> > > > Connect.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> Some design decisions need to be taken, so please
> advise
> > me
> > > > on
> > > > > > the
> > > > > > > >>>> same.
> > > > > > > >>>>>>> Feel free to express any thoughts or concerns as well.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> Many many thanks to Ewen Cheslack-Postava.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> -Nisarg
> > > > > > > >>>>>
> > > > > > > >>>>>
> > > > > > > >>>>
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>> --
> > > > > > > >>> Best regards,
> > > > > > > >>> Michael Noll
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>> *Michael G. Noll | Product Manager | Confluent | +1
> > > > > > > 650.453.5860Download
> > > > > > > >>> Apache Kafka and Confluent Platform:
> > www.confluent.io/download
> > > > > > > >>> <http://www.confluent.io/download>*
> > > > > > > >>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Thanks,
> > > > > Ewen
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Ewen
> > >
> >
> >
> >
> > --
> >
> > *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
> > Apache Kafka and Confluent Platform: www.confluent.io/download
> > <http://www.confluent.io/download>*
> >
>



-- 
Thanks,
Ewen

Reply via email to