Hi Tom,
Referring back to Chris's points.

1) Exception handling - yes, agreed. This KIP can just say the exception 
handling for
predicates is the same as transformations.

2) Worker plugins - agree too.

3) Filter SMT - If the Filter SMT is just becoming a transformation that drops 
any records that
make it through the predicate, it's probably actually a Drop transformation. If 
it's
not guarded with a predicate, it drops every record, but with a predicate it 
just
drops records that match.

4) Syntax - I quite like the idea of a separate namespace for predicates, just 
like for transformations.
I particularly like how you could have 2 transformations referencing the same 
predicate and config.

There is the scope for collision with connector configuration, but my hunch is 
that
"predicates" is less risky that simply "predicate" since it would only clash 
with a connector
that happened to have multiple predicates.

But I think there is a non-zero chance that an existing transformation might 
have used
"predicate" as a config. I have actually written a filter transformation 
myself. It didn't use
"predicate" but it almost did. That's why I suggested "?predicate" which I know 
is a bit
cryptic, but it's also not going to clash.

Here are two serving suggestions:

A) "?predicate" to name the predicate alias to use
transforms: t2
transforms.t2.?predicate: has-my-prefix
transforms.t2.type:ExtractField$Key
transforms.t2.field: c1

B) "?" to name the predicate alias to use
transforms: t2
transforms.t2.?: has-my-prefix
transforms.t2.type:ExtractField$Key
transforms.t2.field: c1


One more question.

Do people think the negation ought to be in the predicate config or in the 
referring transformation
config? We might want to make:

  predicate(a)=>transform(T1)
  predicate(negate(a))=>transform(T2)

easier than having to use two separate predicate aliases with identical 
configuration, except for
the negation.

Thanks,
Andrew

On 04/05/2020, 10:22, "Tom Bentley" <tbent...@redhat.com> wrote:

    Hi Chris,

    Thanks for the feedback. On points 1 and 2 I agree.

    3. This is a great point. I'm happy to add this to the KIP too, but note it
    removes the need to add validate(Map<String, String>) to both
    Transformation and Predicate. That doesn't mean we shouldn't still do it,
    just that the change is not motivated by the features being added in the
    KIP. So I've removed validate(Map<String, String>) for now.

    4. Your proposal would further reduce the chance of Transformation config
    conflict (because just the 'predicate' parameter might conflict), but it
    also adds the chance of conflict with Connector parameters (all the
    'predicates.*' parameters). Connector implementations are far more numerous
    than Transformation implementations, so the risk is proportionately
    greater. As you say we could try to reduce the chances of conflict (e.g.
    '?predicate' parameter on transformations and '?predicates' prefix on
    connectors), but I'm still left with uneasy feelings. I'd like to hear
    about what others think before I change this one.

    Cheers,

    Tom


    On Mon, May 4, 2020 at 4:57 AM Christopher Egerton <chr...@confluent.io>
    wrote:

    > Hi all,
    >
    > This has certainly taken quite a turn! I wish we could get this done
    > without adding another pluggable interface but if the benefit is that now
    > any SMT--new or pre-existing--can be conditionally applied, it seems like
    > it might be worth it.
    >
    > As far as the proposed design goes, some thoughts:
    >
    > 1. There isn't a clear reason why exceptions thrown from predicates should
    > be treated differently from exceptions thrown by transformations. We have
    > reasonable error-tolerance mechanisms in the Connect framework; why
    > silently swallow errors, especially if the risk is publishing potentially
    > corrupted data to Kafka or an external system because an SMT that should
    > have been applied wasn't?
    >
    > 2. It might be worth noting that the interface is a worker plugin, and 
that
    > presumably it would be loaded in the same way as other worker plugins such
    > as converters, connectors, and REST extensions. This would include 
aliasing
    > behavior that would allow users to specify predicates using their simple
    > class names as long as no two predicate plugins with the same simple name
    > were available on the worker.
    >
    > 3. Why would the Filter SMT explicitly take in a predicate in its
    > configuration if predicates can be applied to all SMTs? Just reading the
    > idea for a filter SMT, it seemed like the behavior would be that the SMT
    > would take in no configuration parameters and just blindly drop everything
    > that it sees, but typical use would involve pairing it with a predicate.
    >
    > 4. The question mark syntax seems a little cryptic, and combining the
    > configuration properties for the predicate and the transformation in the
    > same namespace ("transforms.<name>.*) seems a little noisy. What do you
    > think about allowing predicates to be configured in their own namespace,
    > and referenced by name in a single "predicate" (or maybe "?predicate" if 
we
    > really want to avoid risking backwards compatibility concerns) property?
    > This would also enable them to be more easily reused across several SMTs.
    >
    > For example, you might configure a worker with these properties (assuming
    > plugin aliasing is supported for predicates in the same way that it is for
    > transformations):
    >
    > transforms: t2
    > transforms.t2.predicate: has-my-prefix
    > transforms.t2.type:ExtractField$Key
    > transforms.t2.field: c1
    >
    > predicates: has-my-prefix
    > predicates.has-my-prefix.type: TopicNameMatch
    > predicates.has-my-prefix.negate: true
    > predicates.has-my-prefix.pattern: my-prefix-.*
    >
    > Excited to see how this is evolving and I think we're heading towards
    > something really useful for the Connect framework.
    >
    > Cheers,
    >
    > Chris
    >
    >
    > On Fri, May 1, 2020 at 9:51 AM Andrew Schofield <andrew_schofi...@live.com
    > >
    > wrote:
    >
    > > Hi Tom,
    > > Looks good to me.
    > >
    > > Thanks,
    > > Andrew
    > >
    > > On 01/05/2020, 16:24, "Tom Bentley" <tbent...@redhat.com> wrote:
    > >
    > >     Hi,
    > >
    > >     I've updated the KIP to reflect recent discussions. Please note the
    > >     following:
    > >
    > >     1. I've described a HasHeaderKey predicate rather than
    > HeaderKeyMatches
    > >     because at the moment Headers doesn't expose the whole set of
    > headers,
    > > only
    > >     those with a specific name. Exposing this would significantly
    > increase
    > > the
    > >     scope of this KIP but relatively little extra benefit.
    > >     2. I used org.apache.kafka.connect.transformer.predicates rather 
than
    > >     org.apache.kafka.connect.predicates, since I thought it better
    > > reflected
    > >     the use of predicates within transforms. I'm flexible on this one if
    > >     someone has a good case for the original name. For example the
    > original
    > >     package name would be more appropriate if we were expecting
    > Connectors
    > > to
    > >     make use of Predicates somehow.
    > >     3. I've dropped the special case of not needing to provide a FQN for
    > >     predicate classes in that package, since this isn't something
    > > supported by
    > >     transformations themselves as far as I know. I like the idea, but it
    > > seemed
    > >     inconsistent to need it for transformations but not for predicates.
    > >     4. Chris, I've used your suggestions for a validate() method for the
    > > extra
    > >     configurability needed. I've also added this to Predicate (even
    > though
    > > I
    > >     don't need it) for consistency with Connector and Transformation.
    > >     5. For negating the result of a predicate I decided it was clearer 
to
    > > just
    > >     have a "negate" config parameter, supported by all predicates, than
    > to
    > >     reach for more punctuation.
    > >
    > >     I'd be grateful for any more feedback people might have.
    > >
    > >     Kind regards,
    > >
    > >     Tom
    > >
    > >     On Tue, Apr 28, 2020 at 3:50 PM Andrew Schofield <
    > > andrew_schofi...@live.com>
    > >     wrote:
    > >
    > >     > Hi Tom,
    > >     > I think we should go for a consistent naming convention for the
    > >     > predicates, maybe so they
    > >     > read nicely if you say "IF" first. Here's a suggestion for
    > > predicates that
    > >     > the KIP could introduce:
    > >     >  - TopicNameMatches
    > >     >  - HeaderKeyMatches
    > >     >  - RecordIsTombstone
    > >     >
    > >     > On naming, I also suggest using
    > >     > org.apache.kafka.connect.predicates.Predicate
    > >     > as the name for the interface.
    > >     >
    > >     > I hadn't settled on a preference for negation. I suppose there are
    > > three
    > >     > results
    > >     > from evaluating a predicate - true, false or an exception caught 
by
    > > Kafka
    > >     > Connect.
    > >     > I suggest the exception case is always a predicate fail so the
    > > guarded
    > >     > transformation
    > >     > is skipped. Initially I'd preferred having the predicate be
    > > configured to
    > >     > return a
    > >     > negated result to give a "not match" behaviour. Thinking again
    > now, I
    > >     > prefer something like:
    > >     >
    > >     > transforms: t1
    > >     > transforms.t1.?type: RecordIsTombstone
    > >     > transforms.t1.type:
    > > org.apache.kafka.connect.transforms.InsertField$Value
    > >     > transforms.t1.static.field: deleted
    > >     > transforms.t1.static.value: true
    > >     >
    > >     > The "?" means the transform guarded by the predicate runs in the
    > > Predicate
    > >     > returns true.
    > >     >
    > >     > transforms: t2
    > >     > transforms.t2.?!type:
    > > org.apache.kafka.connect.predicates.TopicNameMatch
    > >     > transforms.t2.?regex: my-prefix-.*
    > >     > transforms.t2.type:
    > > org.apache.kafka.connect.transforms.ExtractField$Key
    > >     > transforms.t2.field: c1
    > >     >
    > >     > The "?!" means the transform guarded by the predicate runs if the
    > > Predicate
    > >     > returns false.
    > >     >
    > >     > I think adding a Filter SMT that uses a Predicate is a nice idea.
    > >     >
    > >     > Cheers,
    > >     > Andrew
    > >     >
    >
    >

Reply via email to