Re: [DISCUSS] KIP-820: Extend KStream process with new Processor API

2022-02-14 Thread Paul Whalen
No specific comments, but I just wanted to mention I like the direction of
the KIP.  My team is a big user of "transform" methods because of the
ability to chain them, and I have always found the terminology challenging
to explain alongside "process".  It felt like one concept with two names.
So moving towards a single API that is powerful enough to handle both use
cases seems absolutely correct to me.

Paul

On Mon, Feb 14, 2022 at 1:12 PM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Got it. Thanks John, this make sense.
>
> I've updated the KIP to include the deprecation of:
>
>- KStream#transform
>- KStream#transformValues
>- KStream#flatTransform
>- KStream#flatTransformValues
>
>
>
> On Fri, 11 Feb 2022 at 15:16, John Roesler  wrote:
>
> > Thanks, Jorge!
> >
> > I think it’ll be better to keep this KIP focused on KStream methods only.
> > I suspect that the KTable methods may be more complicated than just that
> > proposed replacement, but it’ll also be easier to consider that question
> in
> > isolation.
> >
> > The nice thing about just deprecating the KStream methods and not the
> > Transform* interfaces is that you can keep your proposal just scoped to
> > KStream and not have any consequences for the rest of the DSL.
> >
> > Thanks again,
> > John
> >
> > On Fri, Feb 11, 2022, at 06:43, Jorge Esteban Quilcate Otoya wrote:
> > > Thanks, John.
> > >
> > >> 4) I agree that we shouldn't deprecate the Transformer*
> > > classes, but do you think we should deprecate the
> > > KStream#transform* methods? I'm curious if there's any
> > > remaining reason to have those methods, or if your KIP
> > > completely obviates them.
> > >
> > > Good catch.
> > > I considered that deprecating `Transformer*` and `transform*` would go
> > hand
> > > in hand — maybe it happened similarly with old `Processor` and
> `process`?
> > > Though deprecating only `transform*` operations could be a better
> signal
> > > for users than non deprecating anything at all and pave the way to it's
> > > deprecation.
> > >
> > > Should this deprecation also consider including
> `KTable#transformValues`?
> > > The approach proposed on the KIP:
> > > `ktable.toStream().processValues().toTable()` seems fair to me, though
> I
> > > will have to test it further.
> > >
> > > I'm happy to update the KIP if there's some consensus around this.
> > > Will add the deprecation notes these days and wait for any additional
> > > feedback on this topic before wrapping up the KIP.
> > >
> > >
> > > On Fri, 11 Feb 2022 at 04:03, John Roesler 
> wrote:
> > >
> > >> Thanks for the update, Jorge!
> > >>
> > >> I just read over the KIP again, and I'm in support. One more
> > >> question came up for me, though:
> > >>
> > >> 4) I agree that we shouldn't deprecate the Transformer*
> > >> classes, but do you think we should deprecate the
> > >> KStream#transform* methods? I'm curious if there's any
> > >> remaining reason to have those methods, or if your KIP
> > >> completely obviates them.
> > >>
> > >> Thanks,
> > >> -John
> > >>
> > >> On Thu, 2022-02-10 at 21:32 +, Jorge Esteban Quilcate
> > >> Otoya wrote:
> > >> > Thank you both for your feedback!
> > >> >
> > >> > I have added the following note on punctuation:
> > >> >
> > >> > ```
> > >> > NOTE: The key validation can be defined when processing the message.
> > >> > Though, with punctuations it won't be possible to define the key for
> > >> > validation before forwarding, therefore it won't be possible to
> > forward
> > >> > from punctuation.
> > >> > This is similar behavior to how `ValueTransformer`s behave at the
> > moment.
> > >> > ```
> > >> >
> > >> > Also make it explicit also that we are going to apply referencial
> > >> equality
> > >> > for key validation.
> > >> >
> > >> > I hope this is covering all your feedback, let me know if I'm
> missing
> > >> > anything.
> > >> >
> > >> > Cheers,
> > >> > Jorge.
> > >> >
> > >> > On Wed, 9 Feb 2022 at 22:19, Guozhang Wang 
> > wrote:
> > >> >
> > >> > > I'm +1 on John's point 3) for punctuations.
> > >> > >
> > >> > > And I think if people are on the same page that a reference
> equality
> > >> check
> > >> > > per record is not a huge overhead, I think doing that enforcement
> is
> > >> better
> > >> > > than documentations and hand-wavy undefined behaviors.
> > >> > >
> > >> > >
> > >> > > Guozhang
> > >> > >
> > >> > > On Wed, Feb 9, 2022 at 11:27 AM John Roesler  >
> > >> wrote:
> > >> > >
> > >> > > > Thanks for the KIP Jorge,
> > >> > > >
> > >> > > > I'm in support of your proposal.
> > >> > > >
> > >> > > > 1)
> > >> > > > I do agree with Guozhang's point (1). I think the cleanest
> > >> > > > approach. I think it's cleaner and better to keep the
> > >> > > > enforcement internal to the framework than to introduce a
> > >> > > > public API or context wrapper for processors to use
> > >> > > > explicitly.
> > >> > > >
> > >> > > > 2) I tend to agree with you on this one; I think the
> > >> > > > e

Re: [Go] Flight client app metadata access

2021-04-21 Thread Paul Whalen
Whoops, please ignore.  I didn't pay enough attention to my autocomplete
for Apache mailing lists :)

On Wed, Apr 21, 2021 at 11:37 AM Paul Whalen  wrote:

> Hi all,
>
> I'm using the Go Flight client, working off of this example:
> https://github.com/apache/arrow/blob/master/go/arrow/flight/flight_test.go
>
> I've found that there isn't a very convenient way to access the app
> metadata from a Flight stream, because the ipc.Reader you get from calling
> flight.NewRecordReader() only exposes the array.Record as you read data
> from it.  This makes sense to me because only Flight defines the App
> Metadata concept, not the Arrow IPC format on its own.  I've been able to
> work around it by dropping down a level, and building an alternate
> flight.dataMessageReader but this seems subpar.
>
> Am I missing something?  Or is there an opportunity to make the result of
> flight.NewRecordReader() a bit more useful such that it includes the app
> metadata?  Perhaps a type more specific to Flight, not just IPC in
> general.  On the Java side I see that FlightStream has getLatestMetadata()
> which seems like a pattern worth following.
>
> Thanks,
> Paul
>


[Go] Flight client app metadata access

2021-04-21 Thread Paul Whalen
Hi all,

I'm using the Go Flight client, working off of this example:
https://github.com/apache/arrow/blob/master/go/arrow/flight/flight_test.go

I've found that there isn't a very convenient way to access the app
metadata from a Flight stream, because the ipc.Reader you get from calling
flight.NewRecordReader() only exposes the array.Record as you read data
from it.  This makes sense to me because only Flight defines the App
Metadata concept, not the Arrow IPC format on its own.  I've been able to
work around it by dropping down a level, and building an alternate
flight.dataMessageReader but this seems subpar.

Am I missing something?  Or is there an opportunity to make the result of
flight.NewRecordReader() a bit more useful such that it includes the app
metadata?  Perhaps a type more specific to Flight, not just IPC in
general.  On the Java side I see that FlightStream has getLatestMetadata()
which seems like a pattern worth following.

Thanks,
Paul


Re: [DISCUSSION] KIP-686: API to ensure Records policy on the broker

2020-11-30 Thread Paul Whalen
Nikolay,

I'm not a committer, but perhaps I can start the discussion.  I've had the
urge for a similar feature after being bitten by writing a poorly formed
record to a topic - it's natural to want to push schema validation into the
broker, since that's the way regular databases work.  But I'm a bit
skeptical of the complexity it introduces.  Some questions I think would
have to be answered that aren't currently in the KIP:
 - How does the producer get notified of a failure to pass the RecordPolicy
for one or more records, and how should it recover?
 - Assuming a RecordPolicy can be loaded by a broker without restarting it,
what is the mechanism by which this happens?
 - Must writes to replicas also adhere to the RecordPolicy?
 - Must already-written written records adhere to RecordPolicy, if it is
added later?

Also, the rejected alternatives section is blank - I see the status quo as
at least one alternative, in particular, managing schema outside of kafka
itself using something like the confluent schema registry.  Maybe you can
say why RecordPolicy would be better?

Best,
Paul

On Mon, Nov 30, 2020 at 9:58 AM Nikolay Izhikov  wrote:

> Friendly bump.
>
> Please, share your feedback.
> Do we need those feature in the Kafka?
>
> > 23 нояб. 2020 г., в 12:09, Nikolay Izhikov 
> написал(а):
> >
> > Hello!
> >
> > Any additional feedback on this KIP?
> > I believe this API can be useful for Kafka users.
> >
> >
> >> 18 нояб. 2020 г., в 14:47, Nikolay Izhikov 
> написал(а):
> >>
> >> Hello, Ismael.
> >>
> >> Thanks for the feedback.
> >> You are right, I read public interfaces definition not carefully :)
> >>
> >> Updated KIP according to your objection.
> >> I propose to expose 2 new public interfaces:
> >>
> >> ```
> >> package org.apache.kafka.common;
> >>
> >> public interface Record {
> >>   long timestamp();
> >>
> >>   boolean hasKey();
> >>
> >>   ByteBuffer key();
> >>
> >>   boolean hasValue();
> >>
> >>   ByteBuffer value();
> >>
> >>   Header[] headers();
> >> }
> >>
> >> package org.apache.kafka.server.policy;
> >>
> >> public interface RecordsPolicy extends Configurable, AutoCloseable {
> >>   void validate(String topic, int partition, Iterable
> records) throws PolicyViolationException;
> >> }
> >> ```
> >>
> >> Data exposed in Record and in validate method itself seems to enough
> for implementation of any reasonable Policy.
> >>
> >>> 17 нояб. 2020 г., в 19:44, Ismael Juma  написал(а):
> >>>
> >>> Thanks for the KIP. The policy interface is a small part of this. You
> also
> >>> have to describe the new public API that will be exposed as part of
> this.
> >>> For example, there is no public `Records` class.
> >>>
> >>> Ismael
> >>>
> >>> On Tue, Nov 17, 2020 at 8:24 AM Nikolay Izhikov 
> wrote:
> >>>
>  Hello.
> 
>  I want to start discussion of the KIP-686 [1].
>  I propose to introduce the new public interface for it RecordsPolicy:
> 
>  ```
>  public interface RecordsPolicy extends Configurable, AutoCloseable {
>  void validate(String topic, Records records) throws
>  PolicyViolationException;
>  }
>  ```
> 
>  and a two new configuration options:
>   * `records.policy.class.name: String` - sets class name of the
>  implementation of RecordsPolicy for the specific topic.
>   * `records.policy.enabled: Boolean` - enable or disable records
> policy
>  for the topic.
> 
>  If `records.policy.enabled=true` then an instance of the
> `RecordsPolicy`
>  should check each Records batch before applying data to the log.
>  If `PolicyViolationException`  thrown from the
> `RecordsPolicy#validate`
>  method then no data added to the log and the client receives an error.
> 
>  Motivation:
> 
>  During the adoption of Kafka in large enterprises, it's important to
>  guarantee data in some topic conforms to the specific format.
>  When data are written and read by the different applications
> developed by
>  the different teams it's hard to guarantee data format using only
> custom
>  SerDe, because malicious applications can use different SerDe.
>  The data format can be enforced only on the broker side.
> 
>  Please, share your feedback.
> 
>  [1]
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-686%3A+API+to+ensure+Records+policy+on+the+broker
> >>
> >
>
>


Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-29 Thread Paul Whalen
of the current situation. Does my
> >> reply address your concerns?
> >>
> >> Thanks,
> >> -John
> >>
> >> On Tue, 2020-09-29 at 18:34 -0700, Sophie Blee-Goldman
> >> wrote:
> >> > > However, the record metadata is only defined when the parent
> forwards
> >> > > while processing a
> >> >
> >> > real record, not when it calls forward from the punctuator
> >> >
> >> >
> >> > Can we take a step back for a second...why wouldn't you be required to
> >> set
> >> > the RecordContext
> >> > yourself when calling forward from a Punctuator? I think I agree with
> >> Paul
> >> > here, it seems kind of
> >> > absurd not to enforce that the RecordContext be present inside the
> >> > process() method.
> >> >
> >> > The original problem with Punctuators, as I understood it, was that
> all
> >> of
> >> > the RecordContext
> >> > fields were exposed automatically to both the Processor and any
> >> Punctuator,
> >> > due to being
> >> > direct methods on the ProcessorContext. We can't control which
> >> > ProcessorContext methods
> >> > someone will call from with a Punctuator vs from a Processor. The best
> >> we
> >> > could do was
> >> > set these "nonsense" fields to null when inside a Punctuator, or set
> >> them
> >> > to some dummy
> >> > values as you pointed out.
> >> >
> >> > But then you proposed the solution of a separate RecordContext which
> is
> >> not
> >> > attached to the
> >> > ProcessorContext at all. This seemed to solve the above problem very
> >> > neatly: we only pass
> >> > in the RecordContext to the process() method, so we don't have to
> worry
> >> > about people trying
> >> > to access these fields from within a Punctuator. The fields aren't
> >> > accessible unless they're
> >> > defined.
> >> >
> >> > So what happens when someone wants to forward something from within a
> >> > Punctuator? I
> >> > don't think it's reasonable to let the timestamp field be undefined,
> >> ever.
> >> > What if the Punctuator
> >> > forwards directly to a sink, or directly to some windowing logic. Are
> we
> >> > supposed to add
> >> > handling for the RecordContext == null case to every processor? Or are
> >> we
> >> > just going to
> >> > assume the implicit restriction that users will only forward records
> >> from a
> >> > Punctuator to
> >> > downstream processors that know how to handle and/or set the
> >> RecordContext
> >> > if it's
> >> > undefined. That seems to throw away a lot of the awesome safety added
> in
> >> > this KIP
> >> >
> >> > Apologies for the rant. But I feel pretty strongly that allowing to
> >> forward
> >> > records from a
> >> > Punctuator without a defined RecordContext would be asking for
> trouble.
> >> > Imo, if you
> >> > want to forward from a Punctuator, you need to store the info you need
> >> in
> >> > order to
> >> > set the timestamp, or make one up yourself
> >> >
> >> > (the one alternative I can think of here is that maybe we could pass
> in
> >> the
> >> > current
> >> > partition time, so users can at least put in a reasonable estimate for
> >> the
> >> > timestamp
> >> > that won't cause it to get dropped and won't potentially lurch the
> >> > streamtime far into
> >> > the future. This would be similar to what we do in the
> >> TimestampExtractor)
> >> >
> >> > On Tue, Sep 29, 2020 at 6:06 PM John Roesler 
> >> wrote:
> >> >
> >> > > Oh, I guess one other thing I should have mentioned is that I’ve
> >> recently
> >> > > discovered that in cases where the context is undefined, we
> currently
> >> just
> >> > > fill in dummy values for the context. So there’s a good chance that
> >> real
> >> > > applications in use are depending on undefined context without even
> >> > > realizing it. What I’m hoping to do is just make the situation
> >> ex

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-29 Thread Paul Whalen
ly and it's not possible to pass
> it to
> > > > a `punctuate` callback.
> > > >
> > > > For the stores and changelogging: I think there are two cases. (1)
> You
> > > > use a plain key-value store. For this case, it seems you do not care
> > > > about the timestamp and thus does not care what timestamp is set in
> the
> > > > changelog records. (We can set anything we want, as it's not
> relevant at
> > > > all -- the timestamp is ignored on read anyway.) (2) The other case
> is,
> > > > that one does care about timestamps, and for this case should use
> > > > TimestampedKeyValueStore. The passed timestamp will be set on the
> > > > changelog records for this case.
> > > >
> > > > Thus, for both cases, accessing the record context does not seems to
> be
> > > > a requirement. And providing access to the processor context to, eg.,
> > > > `forward()` or similar seems safe.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 9/10/20 7:25 PM, John Roesler wrote:
> > > > > Thanks for the reply, Paul!
> > > > >
> > > > > I certainly intend to make sure that the changelogging layer
> > > > > continues to work the way it does now, by hook or by crook.
> > > > > I think the easiest path for me is to just "cheat" and get
> > > > > the real ProcessorContext into the ChangeLoggingStore
> > > > > implementation somehow. I'll tag you on the PR when I create
> > > > > it, so you have an opportunity to express a preference about
> > > > > the implementation choice, and maybe even compile/test
> > > > > against it to make sure your stuff still works.
> > > > >
> > > > > Regarding this:
> > > > >
> > > > > > we have an interest in making a state store with a richer
> > > > > > way of querying its data (like perhaps getting all values
> > > > > > associated with a secondary key), while still ultimately
> > > > > > writing to the changelog topic for later restoration.
> > > > >
> > > > > This is very intriguing to me. On the side, I've been
> > > > > preparing a couple of ideas related to this topic. I don't
> > > > > think I have a coherent enough thought to even express it in
> > > > > a Jira right now, but when I do, I'll tag you on it also to
> > > > > see what you think.
> > > > >
> > > > > Whenever you're ready to share the usability improvement
> > > > > ideas, I'm very interested to see what you've come up with.
> > > > >
> > > > > Thanks,
> > > > > -John
> > > > >
> > > > > On Thu, 2020-09-10 at 21:02 -0500, Paul Whalen wrote:
> > > > > > > when you use a HashMap or RocksDB or other "state stores", you
> don't
> > > > > > > expect them to automatically know extra stuff about the record
> you're
> > > > > > > storing.
> > > > > >
> > > > > > So, I don't think there is any reason we *can't* retain the
> record context
> > > > > > > in the StateStoreContext, and if any users came along with a
> clear use case
> > > > > > > I'd find that convincing.
> > > > > > >
> > > > > >
> > > > > > I agree with the principle of being conservative with the
> StateStoreContext
> > > > > > API.  Regarding user expectations or a clear use case, the only
> > > > > > counterpoint I would offer is that we sort of have that use case
> already,
> > > > > > which is the example I gave of the change logging store using the
> > > > > > timestamp.  I am curious if this functionality will be retained
> when using
> > > > > > built in state stores, or will a low-level processor get a
> KeyValueStore
> > > > > > that no longer writes to the changelog topic with the record's
> timestamp.
> > > > > > While I personally don't care much about that functionality
> specifically, I
> > > > > > have a general desire for custom state stores to easily do the
> things that
> > > > > > built in state stores do.
> > > >

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-10 Thread Paul Whalen
gt; > store
> > > > > > > On Wed, Sep 9, 2020 at 8:07 PM John Roesler <
> vvcep...@apache.org
> > >
> > > > wrote:
> > > > > > > > Hi Paul,
> > > > > > > >
> > > > > > > > It's good to hear from you!
> > > > > > > >
> > > > > > > > I'm glad you're in favor of the direction. Especially when
> > > > > > > > it comes to public API and usability concens, I tend to
> > > > > > > > think that "the folks who matter" are actually the folks who
> > > > > > > > have to use the APIs to accomplish real tasks. It can be
> > > > > > > > hard for me to be sure I'm thinking clearly from that
> > > > > > > > perspective.
> > > > > > > >
> > > > > > > > Funny story, I also started down this road a couple of times
> > > > > > > > already and backed them out before the KIP because I was
> > > > > > > > afraid of the scope of the proposal. Unfortunately, needing
> > > > > > > > to make a new ProcessorContext kind of forced my hand.
> > > > > > > >
> > > > > > > > I see you've called me out about the ChangeLogging stores :)
> > > > > > > > In fact, I think these are the main/only reason that stores
> > > > > > > > might really need to invoke "forward()". My secret plan was
> > > > > > > > to cheat and either accomplish change-logging by a different
> > > > > > > > mechanism than implementing the store interface, or by just
> > > > > > > > breaking encapsulation to sneak the "real" ProcessorContext
> > > > > > > > into the ChangeLogging stores. But those are all
> > > > > > > > implementation details. I think the key question is whether
> > > > > > > > anyone else has a store implementation that needs to call
> > > > > > > > "forward()". It's not what you mentioned, but since you
> > > > > > > > spoke up, I'll just ask: if you have a use case for calling
> > > > > > > > "forward()" in a store, please share it.
> > > > > > > >
> > > > > > > > Regarding the other record-specific context methods, I think
> > > > > > > > you have a good point, but I also can't quite wrap my head
> > > > > > > > around how we can actually guarantee it to work in general.
> > > > > > > > For example, the case you cited, where the implementation of
> > > > > > > > `KeyValueStore#put(key, value)` uses the context to augment
> > > > > > > > the record with timestamp information. This relies on the
> > > > > > > > assumption that you would only call "put()" from inside a
> > > > > > > > `Processor#process(key, value)` call in which the record
> > > > > > > > being processed is the same record that you're trying to put
> > > > > > > > into the store.
> > > > > > > >
> > > > > > > > If you were to call "put" from a punctuator, or do a
> > > > > > > > `range()` query and then update one of those records with
> > > > > > > > `put()`, you'd have a very subtle bug on your hands. Right
> > > > > > > > now, the Streams component that actually calls the Processor
> > > > > > > > takes care to set the right record context before invoking
> > > > > > > > the method, and in the case of caching, etc., it also takes
> > > > > > > > care to swap out the old context and keep it somewhere safe.
> > > > > > > > But when it comes to public API Processors calling methods
> > > > > > > > on StateStores, there's no opportunity for any component to
> > > > > > > > make sure the context is always correct.
> > > > > > > >
> > > > > > > > In the face of that situation, it seemed better to just move
> > > > > > > > in the direction of a "normal" data store. I.e., when you
> > > > > > > > use a HashMap or RocksDB or other "state sto

Re: [VOTE] KIP-478 Strongly Typed Streams Processor API

2020-09-09 Thread Paul Whalen
John,

It's exciting to see this KIP head in this direction!  In the last year or
so I've tried to sketch out some usability improvements for custom state
stores, and I also ended up splitting out the StateStoreContext from the
ProcessorContext in an attempt to facilitate what I was doing.  I sort of
abandoned it when I realized how large the ideal change might have to be,
but it's great to see that there is other interest in moving in this
direction (from the folks that matter :) ).

Having taken a stab at it myself, I have a comment/question on this bullet
about StateStoreContext:

It does *not*  include anything processor- or record- specific, like
> `forward()` or any information about the "current" record, which is only a
> well-defined in the context of the Processor. Processors process one record
> at a time, but state stores may be used to store and fetch many records, so
> there is no "current record".
>

I totally agree that record-specific or processor-specific context in a
state store is often not well-defined and it would be good to separate that
out, but sometimes it (at least record-specific context) is actually
useful, for example, passing the record's timestamp through to the
underlying storage (or changelog topic):
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java#L121

You could have the writer client of the state store pass this through, but
it would be nice to be able to write state stores where the client did not
have this responsibility.  I'm not sure if the solution is to add some
things back to StateStoreContext, or make yet another context that
represents record-specific context while inside a state store.

Best,
Paul

On Wed, Sep 9, 2020 at 5:43 PM John Roesler  wrote:

> Hello all,
>
> I've been slowly pushing KIP-478 forward over the last year,
> and I'm happy to say that we're making good progress now.
> However, several issues with the original design have come
> to light.
>
> The major changes:
>
> We discovered that the original plan of just adding generic
> parameters to ProcessorContext was too disruptive, so we are
> now adding a new api.ProcessorContext.
>
> That choice forces us to add a new StateStore.init method
> for the new context, but ProcessorContext really isn't ideal
> for state stores to begin with, so I'm proposing a new
> StateStoreContext for this purpose. In a nutshell, there are
> quite a few methods in ProcessorContext that actually should
> never be called from inside a StateStore.
>
> Also, since there is a new ProcessorContext interface, we
> need a new MockProcessorContext implementation in the test-
> utils module.
>
>
>
> The changeset for the KIP document is here:
>
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=14&selectedPageVersions=10
>
> And the KIP itself is here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API
>
>
> If you have any concerns, please let me know!
>
> Thanks,
> -John
>
>


Re: KAFKA-10145

2020-06-10 Thread Paul Whalen
Perhaps I’m misunderstanding, but this looks like the cogroup feature: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup.
 Do you think that covers your use case?

Paul

> On Jun 10, 2020, at 10:13 PM, lqjacklee  wrote:
> 
> Dear team,
> 
>   I have created the JRIA https://issues.apache.org/jira/browse/KAFKA-10145.
> I want to enhance the join feature to support the multiple join/aggregate
> for stream. If anyone is interested or have questions can idea, please let
> me know, thanks.


[jira] [Resolved] (KAFKA-8177) Allow for separate connect instances to have sink connectors with the same name

2020-06-07 Thread Paul Whalen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paul Whalen resolved KAFKA-8177.

Resolution: Resolved

> Allow for separate connect instances to have sink connectors with the same 
> name
> ---
>
> Key: KAFKA-8177
> URL: https://issues.apache.org/jira/browse/KAFKA-8177
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>    Reporter: Paul Whalen
>Priority: Minor
>  Labels: connect
>
> If you have multiple Connect instances (either a single standalone or 
> distributed group of workers) running against the same Kafka cluster, the 
> connect instances cannot each have a sink connector with the same name and 
> still operate independently. This is because the consumer group ID used 
> internally for reading from the source topic(s) is entirely derived from the 
> connector's name: 
> [https://github.com/apache/kafka/blob/d0e436c471ba4122ddcc0f7a1624546f97c4a517/connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java#L24]
> The documentation of Connect implies to me that it supports "multi-tenancy," 
> that is, as long as...
>  * In standalone mode, the {{offset.storage.file.filename}} is not shared 
> between instances
>  * In distributed mode, {{group.id}} and {{config.storage.topic}}, 
> {{offset.storage.topic}}, and {{status.storage.topic}} are not the same 
> between instances
> ... then the connect instances can operate completely independently without 
> fear of conflict.  But the sink connector consumer group naming policy makes 
> this untrue. Obviously this can be achieved by uniquely naming connectors 
> across instances, but in some environments that could be a bit of a nuisance, 
> or a challenging policy to enforce. For instance, imagine a large group of 
> developers or data analysts all running their own standalone Connect to load 
> into a SQL database for their own analysis, or replicating to mirroring to 
> their own local cluster for testing.
> The obvious solution is allow supplying config that gives a Connect instance 
> some notion of identity, and to use that when creating the sink task consumer 
> group. Distributed mode already has this obviously ({{group.id}}), but it 
> would need to be added for standalone mode. Maybe {{instance.id}}? Given that 
> solution it seems like this would need a small KIP.
> I could also imagine this solving this problem through better documentation 
> ("ensure your connector names are unique!"), but having that subtlety doesn't 
> seem worth it to me. (Optionally) assigning identity to every Connect 
> instance seems strictly more clear, without any downside.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-470: TopologyTestDriver test input and output usability improvements

2019-09-20 Thread Paul Whalen
+1 (non-binding). I haven’t contributed to the discussion but I’ve been 
following - it’ll definitely make my team’s life easier. 

> On Sep 20, 2019, at 11:36 AM, Jukka Karvanen  
> wrote:
> 
> Hi all,
> 
> I would like to start vote on KIP-470:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-470%3A+TopologyTestDriver+test+input+and+output+usability+improvements
> 
> 
> Regards,
> 
> Jukka


Re: [DISCUSS] Apache Kafka 2.4.0 release

2019-09-19 Thread Paul Whalen
Manikumar,

KIP-401 was accepted a few weeks ago and there is a PR pending review, can
it be included in the release as well?

Thanks,
Paul

On Mon, Sep 16, 2019 at 6:14 AM Manikumar  wrote:

> Hi All,
>
> Just a reminder that any new/pending KIP must pass vote by next Wednesday
> (Sep 25, 2019) to be included
> in Apache Kafka 2.4.0 release.
>
> Also keep in mind that deadline for feature freeze is Oct 2, 2019.
> In order to be included in the release, major features/KIPs need to be
> merged and minor features need to be
> have PR ready.  Any feature/KIP not in this state will be automatically
> moved to the next release after Oct 2.
>
> There are total 84 open
> <
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20fixVersion%20%3D%202.4.0%20AND%20status%20not%20in%20(resolved%2C%20closed)%20ORDER%20BY%20priority%20DESC%2C%20status%20DESC%2C%20updated%20DESC%20%20%20%20%20%20%20%20
> >
> JIRAs. Please update your assigned JIRAs, if you know they cannot make it
> to 2.4.0.
> There are also quite a few JIRAs related to flaky tests. We really
> appreciate any help on fixing these failing tests.
>
> Thanks,
> Manikumar
>
> On Mon, Sep 16, 2019 at 4:08 PM Manikumar 
> wrote:
>
> > Hi Mickael,
> >
> > Yes, we can include. Added KIP-396 to the wiki page for tracking.
> >
> >
> > Thanks,
> > Manikumar
> >
> > On Mon, Sep 16, 2019 at 3:36 PM Mickael Maison  >
> > wrote:
> >
> >> Hi Manikumar,
> >>
> >> Can we also include KIP-396?
> >> (
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97551484
> >> )
> >> It has been accepted and the PR is ready for review:
> >> https://github.com/apache/kafka/pull/7296
> >>
> >> Thanks
> >>
> >> On Mon, Sep 16, 2019 at 10:56 AM Manikumar 
> >> wrote:
> >> >
> >> > Hi Viktor,
> >> >
> >> > Yes, we can include KIP-434.
> >> >
> >> > Thanks,
> >> >
> >> > On Mon, Sep 16, 2019 at 3:09 PM Viktor Somogyi-Vass <
> >> viktorsomo...@gmail.com>
> >> > wrote:
> >> >
> >> > > Hi Manikumar,
> >> > >
> >> > > Can we please also include KIP-434?
> >> > >
> >> > >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-434%3A+Add+Replica+Fetcher+and+Log+Cleaner+Count+Metrics
> >> > > It has been accepted and there is already a pull request under
> review.
> >> > >
> >> > > Thanks,
> >> > > Viktor
> >> > >
> >> > > On Fri, Sep 6, 2019 at 9:59 AM Manikumar  >
> >> > > wrote:
> >> > >
> >> > > > Hi David,
> >> > > >
> >> > > > Yes, we can include KIP-511.  KIP must be accepted by KIP Freeze
> >> date
> >> > > (Sep
> >> > > > 25, 2019 )
> >> > > >
> >> > > > Thanks,
> >> > > >
> >> > > >
> >> > > > On Fri, Sep 6, 2019 at 12:53 PM David Jacot 
> >> wrote:
> >> > > >
> >> > > > > Hi Manikumar,
> >> > > > >
> >> > > > > Could we add KIP-511 to the plan? I think it will make it.
> >> > > > >
> >> > > > > Thanks,
> >> > > > > David
> >> > > > >
> >> > > > > On Tue, Aug 27, 2019 at 5:32 PM Manikumar <
> >> manikumar.re...@gmail.com>
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Hi all,
> >> > > > > >
> >> > > > > > I put together a draft release plan with Oct 2019 as the
> release
> >> > > month
> >> > > > > and
> >> > > > > > a list of KIPs that have already been voted:
> >> > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=125307901
> >> > > > > >
> >> > > > > > Here are the dates:
> >> > > > > >
> >> > > > > > 1) KIP Freeze:  Sep 25, 2019 (A KIP must be accepted by this
> >> date in
> >> > > > > order
> >> > > > > > to be considered for this release)
> >> > > > > >
> >> > > > > > 2) Feature Freeze:  Oct 2, 2019 (Major features merged &
> >> working on
> >> > > > > > stabilization, minor features have PR,
> >> > > > > >  release branch cut; anything not in this state will be
> >> automatically
> >> > > > > moved
> >> > > > > > to the next release in JIRA.
> >> > > > > >
> >> > > > > > 3) Code Freeze:  Oct 16, 2019
> >> > > > > >
> >> > > > > > 4) Release Date: Oct 30, 2019 (tentative)
> >> > > > > >
> >> > > > > > Please plan accordingly for the features you want push into
> >> Apache
> >> > > > Kafka
> >> > > > > > 2.4.0 release.
> >> > > > > >
> >> > > > > > Regards,
> >> > > > > > Manikumar
> >> > > > > >
> >> > > > > > On Mon, Aug 12, 2019 at 9:08 PM Ismael Juma <
> ism...@juma.me.uk>
> >> > > wrote:
> >> > > > > >
> >> > > > > > > Thanks for volunteering Manikumar. +1
> >> > > > > > >
> >> > > > > > > Ismael
> >> > > > > > >
> >> > > > > > > On Mon, Aug 12, 2019 at 7:54 AM Manikumar <
> >> > > manikumar.re...@gmail.com
> >> > > > >
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > Hi all,
> >> > > > > > > >
> >> > > > > > > > I would like to volunteer to be the release manager for
> our
> >> next
> >> > > > > > > time-based
> >> > > > > > > > feature release (v2.4.0).
> >> > > > > > > >
> >> > > > > > > > If that sounds good, I'll post the release plan over the
> >> next few
> >> > > > > days.
> >> > > > > > > >
> >> > > > > > > > Thanks,
> >> > > > > > > 

Re: [VOTE] KIP-401: TransformerSupplier/ProcessorSupplier StateStore connecting

2019-09-14 Thread Paul Whalen
With 3 binding votes and 1 non-binding, the vote is closed and the KIP is
accepted.

I'm just wrapping up a first draft of a PR here:
https://github.com/apache/kafka/pull/6824

Thanks!
Paul

On Thu, Sep 5, 2019 at 2:17 PM Paul Whalen  wrote:

> Thanks all! I updated the KIP status, and will get to the remaining TODOs
> in my PR hopefully this weekend.
>
> > On Sep 3, 2019, at 6:16 PM, Guozhang Wang  wrote:
> >
> > Hi Paul, thanks for the confirmation!
> >
> > Since we have three binding votes now I think you can proceed and mark it
> > as accepted.
> >
> >> On Tue, Sep 3, 2019 at 3:17 PM Paul Whalen  wrote:
> >>
> >> Yeah, agreed on it being the same reference. That’s the way I have it in
> >> the working PR and I’ll update the KIP for clarity.
> >>
> >>>> On Sep 3, 2019, at 5:04 PM, Matthias J. Sax 
> >>> wrote:
> >>>
> >>> I am strongly in favor of "must be the same reference".
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>> On 9/3/19 2:09 PM, Guozhang Wang wrote:
> >>>> Hi Paul,
> >>>>
> >>>> Thanks for the KIP! +1 (binding).
> >>>>
> >>>> One minor comment about the following:
> >>>>
> >>>> "In order to solve the problem of addStateStore potentially being
> called
> >>>> twice for the same store (because more than one Supplier specifies
> it),
> >> the
> >>>> check for duplicate stores in addStateStores will be relaxed to *allow
> >> for
> >>>> duplicates if the same StoreBuilder instance for the same store
> name*."
> >>>>
> >>>> It worth clarifying how should we check if the StoreBuilder instances
> >> are
> >>>> the same: either 1) equality by reference or 2) equality based on e.g.
> >>>> #equals override function so that two different instances may still be
> >>>> considered "equal". I think you meant 1), just wanted to confirm :)
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>>> On Thu, Aug 29, 2019 at 3:14 PM Paul Whalen 
> >> wrote:
> >>>>>
> >>>>> Thanks for the votes all! With two binding votes we’re in need of one
> >> more
> >>>>> for the KIP to be accepted. With the 2.4 release coming in September,
> >> it
> >>>>> would be great to get another committer to take a look soon so I
> could
> >> set
> >>>>> aside some time to get implementation/documentation done to make it
> >> into
> >>>>> the release.
> >>>>>
> >>>>> Thanks,
> >>>>> Paul
> >>>>>
> >>>>>> On Aug 20, 2019, at 5:47 PM, Bill Bejeck  wrote:
> >>>>>>
> >>>>>> Thanks for the KIP.
> >>>>>>
> >>>>>> +1 (binding)
> >>>>>>
> >>>>>> On Tue, Aug 20, 2019 at 6:28 PM Matthias J. Sax <
> >> matth...@confluent.io>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> +1 (binding)
> >>>>>>>
> >>>>>>>
> >>>>>>>> On 6/17/19 2:32 PM, John Roesler wrote:
> >>>>>>>> I'm +1 (nonbinding) on the current iteration of the proposal.
> >>>>>>>>
> >>>>>>>>> On Mon, May 27, 2019 at 1:58 PM Paul Whalen 
> >>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> I spoke too early a month ago, but I believe the proposal is
> >> finalized
> >>>>>>> now
> >>>>>>>>> and ready for voting.
> >>>>>>>>>
> >>>>>>>>> KIP:
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756
> >>>>>>>>>
> >>>>>>>>> Discussion:
> >>
> https://lists.apache.org/thread.html/600996d83d485f2b8daf45037de64a60cebdfac9b234bf3449b6b753@%3Cdev.kafka.apache.org%3E
> >>>>>>>>>
> >>>>>>>>> Pull request (still a WIP, obviously):
> >>>>>>>>> https://github.com/apache/kafka/pull/6824
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Paul
> >>>>>>>>>
> >>>>>>>>>> On Wed, Apr 24, 2019 at 8:00 PM Paul Whalen  >
> >>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hi all,
> >>>>>>>>>>
> >>>>>>>>>> After some good discussion on and adjustments to KIP-401 (which
> I
> >>>>>>> renamed
> >>>>>>>>>> slightly for clarity), chatter has died down so I figured I may
> as
> >>>>> well
> >>>>>>>>>> start a vote.
> >>>>>>>>>>
> >>>>>>>>>> KIP:
> >>>>>>>>>> TransformerSupplier/ProcessorSupplier StateStore connecting
> >>>>>>>>>> <
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756>
> >>>>>>>>>> Discussion:
> >>
> https://lists.apache.org/thread.html/600996d83d485f2b8daf45037de64a60cebdfac9b234bf3449b6b753@%3Cdev.kafka.apache.org%3E
> >>>>>>>>>>
> >>>>>>>>>> Thanks!
> >>>>>>>>>> Paul
> >
> >
> > --
> > -- Guozhang
>


Re: [VOTE] KIP-401: TransformerSupplier/ProcessorSupplier StateStore connecting

2019-09-05 Thread Paul Whalen
Thanks all! I updated the KIP status, and will get to the remaining TODOs in my 
PR hopefully this weekend. 

> On Sep 3, 2019, at 6:16 PM, Guozhang Wang  wrote:
> 
> Hi Paul, thanks for the confirmation!
> 
> Since we have three binding votes now I think you can proceed and mark it
> as accepted.
> 
>> On Tue, Sep 3, 2019 at 3:17 PM Paul Whalen  wrote:
>> 
>> Yeah, agreed on it being the same reference. That’s the way I have it in
>> the working PR and I’ll update the KIP for clarity.
>> 
>>>> On Sep 3, 2019, at 5:04 PM, Matthias J. Sax 
>>> wrote:
>>> 
>>> I am strongly in favor of "must be the same reference".
>>> 
>>> 
>>> -Matthias
>>> 
>>>> On 9/3/19 2:09 PM, Guozhang Wang wrote:
>>>> Hi Paul,
>>>> 
>>>> Thanks for the KIP! +1 (binding).
>>>> 
>>>> One minor comment about the following:
>>>> 
>>>> "In order to solve the problem of addStateStore potentially being called
>>>> twice for the same store (because more than one Supplier specifies it),
>> the
>>>> check for duplicate stores in addStateStores will be relaxed to *allow
>> for
>>>> duplicates if the same StoreBuilder instance for the same store name*."
>>>> 
>>>> It worth clarifying how should we check if the StoreBuilder instances
>> are
>>>> the same: either 1) equality by reference or 2) equality based on e.g.
>>>> #equals override function so that two different instances may still be
>>>> considered "equal". I think you meant 1), just wanted to confirm :)
>>>> 
>>>> 
>>>> Guozhang
>>>> 
>>>>> On Thu, Aug 29, 2019 at 3:14 PM Paul Whalen 
>> wrote:
>>>>> 
>>>>> Thanks for the votes all! With two binding votes we’re in need of one
>> more
>>>>> for the KIP to be accepted. With the 2.4 release coming in September,
>> it
>>>>> would be great to get another committer to take a look soon so I could
>> set
>>>>> aside some time to get implementation/documentation done to make it
>> into
>>>>> the release.
>>>>> 
>>>>> Thanks,
>>>>> Paul
>>>>> 
>>>>>> On Aug 20, 2019, at 5:47 PM, Bill Bejeck  wrote:
>>>>>> 
>>>>>> Thanks for the KIP.
>>>>>> 
>>>>>> +1 (binding)
>>>>>> 
>>>>>> On Tue, Aug 20, 2019 at 6:28 PM Matthias J. Sax <
>> matth...@confluent.io>
>>>>>> wrote:
>>>>>> 
>>>>>>> +1 (binding)
>>>>>>> 
>>>>>>> 
>>>>>>>> On 6/17/19 2:32 PM, John Roesler wrote:
>>>>>>>> I'm +1 (nonbinding) on the current iteration of the proposal.
>>>>>>>> 
>>>>>>>>> On Mon, May 27, 2019 at 1:58 PM Paul Whalen 
>>>>> wrote:
>>>>>>>>> 
>>>>>>>>> I spoke too early a month ago, but I believe the proposal is
>> finalized
>>>>>>> now
>>>>>>>>> and ready for voting.
>>>>>>>>> 
>>>>>>>>> KIP:
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756
>>>>>>>>> 
>>>>>>>>> Discussion:
>> https://lists.apache.org/thread.html/600996d83d485f2b8daf45037de64a60cebdfac9b234bf3449b6b753@%3Cdev.kafka.apache.org%3E
>>>>>>>>> 
>>>>>>>>> Pull request (still a WIP, obviously):
>>>>>>>>> https://github.com/apache/kafka/pull/6824
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> Paul
>>>>>>>>> 
>>>>>>>>>> On Wed, Apr 24, 2019 at 8:00 PM Paul Whalen 
>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hi all,
>>>>>>>>>> 
>>>>>>>>>> After some good discussion on and adjustments to KIP-401 (which I
>>>>>>> renamed
>>>>>>>>>> slightly for clarity), chatter has died down so I figured I may as
>>>>> well
>>>>>>>>>> start a vote.
>>>>>>>>>> 
>>>>>>>>>> KIP:
>>>>>>>>>> TransformerSupplier/ProcessorSupplier StateStore connecting
>>>>>>>>>> <
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756>
>>>>>>>>>> Discussion:
>> https://lists.apache.org/thread.html/600996d83d485f2b8daf45037de64a60cebdfac9b234bf3449b6b753@%3Cdev.kafka.apache.org%3E
>>>>>>>>>> 
>>>>>>>>>> Thanks!
>>>>>>>>>> Paul
> 
> 
> -- 
> -- Guozhang


Re: [VOTE] KIP-401: TransformerSupplier/ProcessorSupplier StateStore connecting

2019-09-03 Thread Paul Whalen
Yeah, agreed on it being the same reference. That’s the way I have it in the 
working PR and I’ll update the KIP for clarity. 

> On Sep 3, 2019, at 5:04 PM, Matthias J. Sax  wrote:
> 
> I am strongly in favor of "must be the same reference".
> 
> 
> -Matthias
> 
>> On 9/3/19 2:09 PM, Guozhang Wang wrote:
>> Hi Paul,
>> 
>> Thanks for the KIP! +1 (binding).
>> 
>> One minor comment about the following:
>> 
>> "In order to solve the problem of addStateStore potentially being called
>> twice for the same store (because more than one Supplier specifies it), the
>> check for duplicate stores in addStateStores will be relaxed to *allow for
>> duplicates if the same StoreBuilder instance for the same store name*."
>> 
>> It worth clarifying how should we check if the StoreBuilder instances are
>> the same: either 1) equality by reference or 2) equality based on e.g.
>> #equals override function so that two different instances may still be
>> considered "equal". I think you meant 1), just wanted to confirm :)
>> 
>> 
>> Guozhang
>> 
>>> On Thu, Aug 29, 2019 at 3:14 PM Paul Whalen  wrote:
>>> 
>>> Thanks for the votes all! With two binding votes we’re in need of one more
>>> for the KIP to be accepted. With the 2.4 release coming in September, it
>>> would be great to get another committer to take a look soon so I could set
>>> aside some time to get implementation/documentation done to make it into
>>> the release.
>>> 
>>> Thanks,
>>> Paul
>>> 
>>>> On Aug 20, 2019, at 5:47 PM, Bill Bejeck  wrote:
>>>> 
>>>> Thanks for the KIP.
>>>> 
>>>> +1 (binding)
>>>> 
>>>> On Tue, Aug 20, 2019 at 6:28 PM Matthias J. Sax 
>>>> wrote:
>>>> 
>>>>> +1 (binding)
>>>>> 
>>>>> 
>>>>>> On 6/17/19 2:32 PM, John Roesler wrote:
>>>>>> I'm +1 (nonbinding) on the current iteration of the proposal.
>>>>>> 
>>>>>>> On Mon, May 27, 2019 at 1:58 PM Paul Whalen 
>>> wrote:
>>>>>>> 
>>>>>>> I spoke too early a month ago, but I believe the proposal is finalized
>>>>> now
>>>>>>> and ready for voting.
>>>>>>> 
>>>>>>> KIP:
>>>>>>> 
>>>>> 
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756
>>>>>>> 
>>>>>>> Discussion:
>>>>>>> 
>>>>> 
>>> https://lists.apache.org/thread.html/600996d83d485f2b8daf45037de64a60cebdfac9b234bf3449b6b753@%3Cdev.kafka.apache.org%3E
>>>>>>> 
>>>>>>> Pull request (still a WIP, obviously):
>>>>>>> https://github.com/apache/kafka/pull/6824
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> Paul
>>>>>>> 
>>>>>>>> On Wed, Apr 24, 2019 at 8:00 PM Paul Whalen 
>>> wrote:
>>>>>>>> 
>>>>>>>> Hi all,
>>>>>>>> 
>>>>>>>> After some good discussion on and adjustments to KIP-401 (which I
>>>>> renamed
>>>>>>>> slightly for clarity), chatter has died down so I figured I may as
>>> well
>>>>>>>> start a vote.
>>>>>>>> 
>>>>>>>> KIP:
>>>>>>>> TransformerSupplier/ProcessorSupplier StateStore connecting
>>>>>>>> <
>>>>> 
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756>
>>>>>>>> Discussion:
>>>>>>>> 
>>>>>>>> 
>>>>> 
>>> https://lists.apache.org/thread.html/600996d83d485f2b8daf45037de64a60cebdfac9b234bf3449b6b753@%3Cdev.kafka.apache.org%3E
>>>>>>>> 
>>>>>>>> Thanks!
>>>>>>>> Paul
>>>>>>>> 
>>>>> 
>>>>> 
>>> 
>> 
>> 
> 


Re: [VOTE] KIP-401: TransformerSupplier/ProcessorSupplier StateStore connecting

2019-08-29 Thread Paul Whalen
Thanks for the votes all! With two binding votes we’re in need of one more for 
the KIP to be accepted. With the 2.4 release coming in September, it would be 
great to get another committer to take a look soon so I could set aside some 
time to get implementation/documentation done to make it into the release.

Thanks,
Paul

> On Aug 20, 2019, at 5:47 PM, Bill Bejeck  wrote:
> 
> Thanks for the KIP.
> 
> +1 (binding)
> 
> On Tue, Aug 20, 2019 at 6:28 PM Matthias J. Sax 
> wrote:
> 
>> +1 (binding)
>> 
>> 
>>> On 6/17/19 2:32 PM, John Roesler wrote:
>>> I'm +1 (nonbinding) on the current iteration of the proposal.
>>> 
>>>> On Mon, May 27, 2019 at 1:58 PM Paul Whalen  wrote:
>>>> 
>>>> I spoke too early a month ago, but I believe the proposal is finalized
>> now
>>>> and ready for voting.
>>>> 
>>>> KIP:
>>>> 
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756
>>>> 
>>>> Discussion:
>>>> 
>> https://lists.apache.org/thread.html/600996d83d485f2b8daf45037de64a60cebdfac9b234bf3449b6b753@%3Cdev.kafka.apache.org%3E
>>>> 
>>>> Pull request (still a WIP, obviously):
>>>> https://github.com/apache/kafka/pull/6824
>>>> 
>>>> Thanks,
>>>> Paul
>>>> 
>>>>> On Wed, Apr 24, 2019 at 8:00 PM Paul Whalen  wrote:
>>>>> 
>>>>> Hi all,
>>>>> 
>>>>> After some good discussion on and adjustments to KIP-401 (which I
>> renamed
>>>>> slightly for clarity), chatter has died down so I figured I may as well
>>>>> start a vote.
>>>>> 
>>>>> KIP:
>>>>> TransformerSupplier/ProcessorSupplier StateStore connecting
>>>>> <
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756>
>>>>> Discussion:
>>>>> 
>>>>> 
>> https://lists.apache.org/thread.html/600996d83d485f2b8daf45037de64a60cebdfac9b234bf3449b6b753@%3Cdev.kafka.apache.org%3E
>>>>> 
>>>>> Thanks!
>>>>> Paul
>>>>> 
>> 
>> 


Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-08-15 Thread Paul Whalen
I updated the KIP (and PR) to relax the restriction on connecting state
stores via either means; it definitely makes sense to me at this point.
I'd love to hear if there are any other concerns or broad objections to the
KIP.

Paul

On Thu, Aug 8, 2019 at 10:12 PM Paul Whalen  wrote:

> Matthias,
>
> You did summarize my thinking correctly, thanks for writing it out.  I
> think the disconnect on opinion is due to a couple things influenced by my
> habits while writing streams code:
>
> 1) I don't see state stores that are "individually owned" versus "shared"
> as that much different at all, at least from the perspective of the
> business logic for the Processor. So it is actually a negative to separate
> the connecting of stores, because it appears in the topology wiring that
> fewer stores are being used by the Processor than actually are.  A reader
> might assume that the Processor doesn't need other state to do its job
> which could cause confusion.
> 2) In practice, my addProcessor() and addStateStore() (or
> builder.addStateStore() and stream.process() ) calls are very near each
> other anyway, so the shared dependency on StoreBuilder is not a burden;
> passing the same object could even bring clarity to the idea that the store
> is shared and not individually owned.
>
> Hearing your thoughts though, I think I have imposed a bit too much of my
> own style and assumptions on the API, especially with the shared dependency
> on a single StoreBuilder and thoughts about store ownership/sharing.  I'm
> going to update the KIP since the one +1 vote comes from John who is favor
> of relaxing the restriction anyway.
>
> Paul
>
> On Wed, Aug 7, 2019 at 11:11 PM Matthias J. Sax 
> wrote:
>
>> I am not sure if I full understand, hence, I try to rephrase:
>>
>> > I can't think of an example that would require both ways, or would
>> > even be more readable using both ways.
>>
>> Example:
>>
>> There are two processor A and B, and one store S that both need to
>> access and one store S_b that only B needs to access:
>>
>> If we don't allow to mix both approaches, it would be required to write
>> the following code:
>>
>>   Topology t = new Topology();
>>   t.addProcessor("A", ...); // does not add any store
>>   t.addProceccor("B", ...); // does not add any store
>>   t.addStateStore(..., "A", "B"); // adds S and connect it to A and B
>>   t.addStateStore(..., "B"); // adds S_b and connect it to B
>>
>> // DSL example:
>>
>>   StreamsBiulder b = new StreamsBuilder();
>>   b.addStateStore() // adds S
>>   b.addStateStore() // adds S_b
>>   stream1.process(..., "S") // add A and connect S
>>   stream2.process(..., "S", "S_b") // add B and connect S and S_b
>>
>>
>> If we allow to mixes both approaches, the code could be (simplified to):
>>
>>   Topology t = new Topology();
>>   t.addProcessor("A", ...); // does not add any store
>>   t.addProceccor("B", ...); // adds/connects S_b implicitly
>>   t.addStateStore(..., "A", "B"); // adds S and connect it to A and B
>>
>> // DSL example
>>
>>   StreamsBiulder b = new StreamsBuilder();
>>   b.addStateStore() // adds S
>>   stream1.process(..., "S") // add A and connect S
>>   stream2.process(..., "S") // add B and connect S; adds/connects S_b
>> implicitly
>>
>> The fact that B has a "private store" could be encapsulated and I don't
>> see why this would be bad?
>>
>> > If you can
>> > do both ways, the actual full set of state stores being connected could
>> be
>> > in wildly different places in the code, which could create confusion.
>>
>> Ie, I don't see why the second version would be confusing, or why the
>> first version would be more readable (I don't argue it's less readable
>> either though; I think both are equally readable)?
>>
>>
>>
>> Or do you argue that we should allow the following:
>>
>> > Shared stores can be passed from
>> > the outside in an anonymous ProcessorSupplier if desired, making it
>> > effectively the same as passing the stateStoreNames var args
>>
>>   Topology t = new Topology();
>>   t.addProcessor("A", ...); // adds/connects S implicitly
>>   t.addProceccor("B", ...); // adds/connects S and S_b implicitly
>>
>> // DSL example
>>
>>   StreamsBiulder b = new StreamsBuilder();
>&

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-08-08 Thread Paul Whalen
Matthias,

You did summarize my thinking correctly, thanks for writing it out.  I
think the disconnect on opinion is due to a couple things influenced by my
habits while writing streams code:

1) I don't see state stores that are "individually owned" versus "shared"
as that much different at all, at least from the perspective of the
business logic for the Processor. So it is actually a negative to separate
the connecting of stores, because it appears in the topology wiring that
fewer stores are being used by the Processor than actually are.  A reader
might assume that the Processor doesn't need other state to do its job
which could cause confusion.
2) In practice, my addProcessor() and addStateStore() (or
builder.addStateStore() and stream.process() ) calls are very near each
other anyway, so the shared dependency on StoreBuilder is not a burden;
passing the same object could even bring clarity to the idea that the store
is shared and not individually owned.

Hearing your thoughts though, I think I have imposed a bit too much of my
own style and assumptions on the API, especially with the shared dependency
on a single StoreBuilder and thoughts about store ownership/sharing.  I'm
going to update the KIP since the one +1 vote comes from John who is favor
of relaxing the restriction anyway.

Paul

On Wed, Aug 7, 2019 at 11:11 PM Matthias J. Sax 
wrote:

> I am not sure if I full understand, hence, I try to rephrase:
>
> > I can't think of an example that would require both ways, or would
> > even be more readable using both ways.
>
> Example:
>
> There are two processor A and B, and one store S that both need to
> access and one store S_b that only B needs to access:
>
> If we don't allow to mix both approaches, it would be required to write
> the following code:
>
>   Topology t = new Topology();
>   t.addProcessor("A", ...); // does not add any store
>   t.addProceccor("B", ...); // does not add any store
>   t.addStateStore(..., "A", "B"); // adds S and connect it to A and B
>   t.addStateStore(..., "B"); // adds S_b and connect it to B
>
> // DSL example:
>
>   StreamsBiulder b = new StreamsBuilder();
>   b.addStateStore() // adds S
>   b.addStateStore() // adds S_b
>   stream1.process(..., "S") // add A and connect S
>   stream2.process(..., "S", "S_b") // add B and connect S and S_b
>
>
> If we allow to mixes both approaches, the code could be (simplified to):
>
>   Topology t = new Topology();
>   t.addProcessor("A", ...); // does not add any store
>   t.addProceccor("B", ...); // adds/connects S_b implicitly
>   t.addStateStore(..., "A", "B"); // adds S and connect it to A and B
>
> // DSL example
>
>   StreamsBiulder b = new StreamsBuilder();
>   b.addStateStore() // adds S
>   stream1.process(..., "S") // add A and connect S
>   stream2.process(..., "S") // add B and connect S; adds/connects S_b
> implicitly
>
> The fact that B has a "private store" could be encapsulated and I don't
> see why this would be bad?
>
> > If you can
> > do both ways, the actual full set of state stores being connected could
> be
> > in wildly different places in the code, which could create confusion.
>
> Ie, I don't see why the second version would be confusing, or why the
> first version would be more readable (I don't argue it's less readable
> either though; I think both are equally readable)?
>
>
>
> Or do you argue that we should allow the following:
>
> > Shared stores can be passed from
> > the outside in an anonymous ProcessorSupplier if desired, making it
> > effectively the same as passing the stateStoreNames var args
>
>   Topology t = new Topology();
>   t.addProcessor("A", ...); // adds/connects S implicitly
>   t.addProceccor("B", ...); // adds/connects S and S_b implicitly
>
> // DSL example
>
>   StreamsBiulder b = new StreamsBuilder();
>   stream1.process(...) // add A and add/connect S implicitly
>   stream2.process(...) // add B and add/connect S and S_b implicitly
>
> For this case, the second implicit adding of S would require to return
> the same `StoreBuilder` instance to make it idempotent what seems hard
> to achieve, because both `ProcessorSuppliers` now have a cross
> dependency to us the same object.
>
> Hence, I don't think this would be a good approach.
>
>
> Also, because we require for a unique store name to always pass the same
> `StoreBuilder` instance, we have actually a good protection against user
> bug that may add two stores with the same name but different builders
>

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-08-07 Thread Paul Whalen
except for that
> > check.
> >
> > One last thought, regarding the all-important interface name: If you
> > wanted to indicate more that the stores are available for Streams to
> > connect, rather than that they are already connected, you could call
> > it ConnectableStoreProvider (similar to AutoCloseable).
> >
> > I just thought I'd summarize the current state, since it's been a
> > while and no one has voted yet. I'll go ahead and vote now on the
> > voting thread, since I'm +1 on the current proposal.
> >
> > Thanks,
> > -John
> >
> > On Mon, May 27, 2019 at 1:59 PM Paul Whalen  wrote:
> >>
> >> It wasn't much of a lift changing option B to work for option C, so I
> >> closed that PR and made a new one, which should be identical to the KIP
> >> right now: https://github.com/apache/kafka/pull/6824.  There are a few
> >> todos still which I will hold off until the KIP is accepted.
> >>
> >> I created a voting thread about a month ago, so I'll bump that now that
> >> we're nearly there.
> >>
> >> Paul
> >>
> >> On Sun, May 26, 2019 at 2:21 PM Paul Whalen  wrote:
> >>
> >>> Per Matthias's suggestion from a while ago, I actually implemented a
> good
> >>> amount of option B to get a sense of the user experience and
> documentation
> >>> requirements.  For a few reasons mentioned below, I think it's not my
> >>> favorite option, and I prefer option C.  But since I did the work and
> it
> >>> can help discussion, I may as well share:
> >>> https://github.com/apache/kafka/pull/6821.
> >>>
> >>> Things I learned along the way implementing Option B:
> >>>  - For the name of the interface, I like ConnectedStoreProvider.  It
> isn't
> >>> perfect but it seems to capture the general gist without being overly
> >>> verbose.  I get that from a strict standpoint it's not "providing
> connected
> >>> stores" but is instead "providing stores to be connected," but I think
> that
> >>> in context and with documentation, the risk of someone being confused
> by
> >>> that is low.
> >>>  - I definitely felt the discoverability issue while trying to write
> clear
> >>> documentation; you really have to make sure to connect the dots for the
> >>> user when the interface isn't connected to anything.
> >>>  - Another problem with a separate interface found while writing
> >>> tests/examples: defining a ProcessorSupplier that also implements
> >>> ConnectedStoreProvider cannot be done anonymously, since you can't
> define
> >>> an anonymous class in Java that implements multiple interfaces.  I
> actually
> >>> consider this a fairly major usability issue - it means a user always
> has
> >>> to have a custom class rather than doing it inline.  We could provide
> an
> >>> abstract class that implements the two, but at that point, we're not
> that
> >>> far from option A or C anyway.
> >>>
> >>> I updated the KIP with my current thinking, which as mentioned is
> >>> Matthias's option C.  Once again for clarity, that *is not* what is in
> the
> >>> linked pull request.  The current KIP is my proposal.
> >>>
> >>> Thanks everyone for the input!
> >>>
> >>> P.S.  What do folks use to edit the HTML documentation, e.g.
> >>> processor-api.html?  I looked at doing it by hand it but it kind of
> looked
> >>> like agony with all the small tags required for formatting code, so I'm
> >>> sort of assuming there's tooling for it.
> >>>
> >>> On Fri, May 24, 2019 at 12:49 AM Matthias J. Sax <
> matth...@confluent.io>
> >>> wrote:
> >>>
> >>>> I think the discussion mixed approaches a little bit, hence, let me
> >>>> rephrase my understanding:
> >>>>
> >>>>
> >>>> A) add new method with default implementation to `ProcessorSupplier`:
> >>>>
> >>>> For this case, we don't add a new interface, but only add a new method
> >>>> to `ProcessorSupplier` -- to keep backward compatibility, we need to
> add
> >>>> a default implementation. Users opt into the new feature by
> overwriting
> >>>> the default implementation.
> >>>>
> >>>>
> &g

Re: [DISCUSS] KIP-478 Strongly Typed Processor API

2019-07-07 Thread Paul Whalen
First of all, +1 on the whole idea, my team has run into (admittedly minor,
but definitely annoying) issues because of the weaker typing.  We're heavy
users of the PAPI and have Processors that, while not hundreds of lines
long, are certainly quite hefty and call context.forward() in many places.

After reading the KIP and discussion a few times, I've convinced myself
that any initial concerns I had aren't really concerns at all (state store
types, for one).  One thing I will mention:  changing *Transformer* to have
ProcessorContext gave me pause, because I have code that does
context.forward in transformers.  Now that "flat transform" is a specific
part of the API it seems okay to steer folks in that direction (to never
use context.process in a transformer), but it should be called out
explicitly in javadocs.  Currently Transformer (which is used for both
transform() and flatTransform() ) doesn't really call out the ambiguity:
https://github.com/apache/kafka/blob/ca641b3e2e48c14ff308181c775775408f5f35f7/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java#L75-L77,
and for migrating users (from before flatTransform) it could be confusing.

Side note, I'd like to plug KIP-401 (there is a discussion thread and a
voting thread) which also relates to using the PAPI.  It seems like there
is some interest and it is in a votable state with the majority of
implementation complete.

Paul

On Fri, Jun 28, 2019 at 2:02 PM Bill Bejeck  wrote:

> Sorry for coming late to the party.
>
> As for the naming I'm in favor of RecordProcessor as well.
>
> I agree that we should not take on doing all of the package movements as
> part of this KIP, especially as John has pointed out, it will be an
> opportunity to discuss some clean-up on individual classes which I envision
> becoming another somewhat involved process.
>
> For the end goal, if possible, here's what I propose.
>
>1. We keep the scope of the KIP the same, *but we only implement* *it in
>phases*
>2. Phase one could include what Guozhang had proposed earlier namely
>1. > 1.a) modifying ProcessorContext only with the output types on
>   forward.
>   > 1.b) modifying Transformer signature to have generics of
>   ProcessorContext,
>   > and then lift the restricting of not using punctuate: if user did
>   not
>   > follow the enforced typing and just code without generics, they
>   will get
>   > warning at compile time and get run-time error if they forward
>   wrong-typed
>   > records, which I think would be acceptable.
>3. Then we could tackle other pieces in an incremental manner as we see
>what makes sense
>
> Just my 2cents
>
> -Bill
>
> On Mon, Jun 24, 2019 at 10:22 PM Guozhang Wang  wrote:
>
> > Hi John,
> >
> > Yeah I think we should not do all the repackaging as part of this KIP as
> > well (we can just do the movement of the Processor / ProcessorSupplier),
> > but I think we need to discuss the end goal here since otherwise we may
> do
> > the repackaging of Processor in this KIP, but only later on realizing
> that
> > other re-packagings are not our favorite solutions.
> >
> >
> > Guozhang
> >
> > On Mon, Jun 24, 2019 at 3:06 PM John Roesler  wrote:
> >
> > > Hey Guozhang,
> > >
> > > Thanks for the idea! I'm wondering if we could take a middle ground
> > > and take your proposed layout as a "roadmap", while only actually
> > > moving the classes that are already involved in this KIP.
> > >
> > > The reason I ask is not just to control the scope of this KIP, but
> > > also, I think that if we move other classes to new packages, we might
> > > also want to take the opportunity to clean up other things about them.
> > > But each one of those would become a discussion point of its own, so
> > > it seems the discussion would become intractable. FWIW, I do like your
> > > idea for precisely this reason, it creates opportunities for us to
> > > consider other changes that we are simply not able to make without
> > > breaking source compatibility.
> > >
> > > If the others feel "kind of favorable" with this overall vision, maybe
> > > we can make one or more Jira tickets to capture it, and then just
> > > alter _this_ proposal to `processor.api.Processor` (etc).
> > >
> > > WDYT?
> > > -John
> > >
> > > On Sun, Jun 23, 2019 at 7:17 PM Guozhang Wang 
> > wrote:
> > > >
> > > > Hello John,
> > > >
> > > > Thanks for your detailed explanation, I've done some quick checks on
> > some
> > > > existing examples that heavily used Processor and the results also
> > makes
> > > me
> > > > worried about my previous statements that "the breakage would not be
> > > big".
> > > > I agree we should maintain compatibility.
> > > >
> > > > About the naming itself, I'm actually a bit inclined into
> sub-packages
> > > than
> > > > renamed new classes, and my motivations are that our current
> packaging
> > is
> > > > already quite coarsen grained and sometimes ill-placed, and hence
> maybe
> > > we

Re: Posted a new article about Kafka Streams

2019-06-16 Thread Paul Whalen
I've only skimmed it so far, but great job!  The community is in serious
need of more examples of the Processor API, there really isn't that much
out there.

One thing I did notice: the iterator you get from kvStore.all() ought to be
closed to release resources when you're done with it.  This matters when
the underlying store is RocksDB, which as I understand it, allocates
additional memory off heap to iterate.  I see this bug everywhere, after
writing it many times myself over the course of many months :). it's too
bad the API can't be more clear, but I guess there's not a ton you can do
in Java.  People think about this kind of thing for DB calls, but when
you're using something that's basically a HashMap you really don't think of
it at all.

Side plug for KIP-401 since you're using the Processor API, it would be
interesting to hear on that discussion thread if you find it useful (
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756).
It seems like there's soft interest, but maybe not yet enough to push it
over the finish line.

Again, great job!

Paul

On Fri, Jun 14, 2019 at 10:33 AM Development  wrote:

> Bad link:
>
> https://medium.com/@daniyaryeralin/utilizing-kafka-streams-processor-api-and-implementing-custom-aggregator-6cb23d00eaa7
>
> > On Jun 14, 2019, at 11:07 AM, Development  wrote:
> >
> > Hello Kafka Dev community,
> >
> > I wrote an article on implementing a custom transformer using Processor
> API for Kafka Streams!
> >
> https://medium.com/@daniyaryeralin/utilizing-kafka-streams-processor-api-and-implementing-custom-aggregation-f6a4a6c376be
> <
> https://medium.com/@daniyaryeralin/utilizing-kafka-streams-processor-api-and-implementing-custom-aggregation-f6a4a6c376be
> >
> > Feel free to leave a feedback and/or corrections if I wrote something
> silly :)
> >
> > Thank you!
> >
> > Best,
> > Daniyar Yeralin
>
>


Re: kafka connect stops consuming data when kafka broker goes down

2019-06-05 Thread Paul Whalen
It’s not totally clear, but this may be 
https://issues.apache.org/jira/plugins/servlet/mobile#issue/KAFKA-7941

For which there is a fix that is very nearly approved: 
https://github.com/apache/kafka/pull/6283

Paul

> On Jun 5, 2019, at 1:26 AM, Srinivas, Kaushik (Nokia - IN/Bangalore) 
>  wrote:
> 
> Hello,
> Anyone has any information on this issue.
> Created a critical ticket for the same, since this is a major stability issue 
> for connect framework.
> https://issues.apache.org/jira/browse/KAFKA-8485?filter=-2
> 
> Thanks.
> Kaushik,
> NOKIA
> 
> From: Srinivas, Kaushik (Nokia - IN/Bangalore)
> Sent: Monday, June 03, 2019 5:22 PM
> To: dev@kafka.apache.org
> Cc: Basil Brito, Aldan (Nokia - IN/Bangalore) 
> Subject: kafka connect stops consuming data when kafka broker goes down
> 
> Hello kafka dev,
> 
> We are encountering an issue when kafka connect is running hdfs sink 
> connector pulling data from kafka and writing to hdfs location.
> In between when the data is flowing in the pipeline from producer -> kafka 
> topic -> kafka connect hdfs sink connector -> hdfs,
> If even one of the kafka broker goes down, the connect framework stops 
> responding. Stops consuming records and REST API also becomes not interactive.
> 
> Until the kafka connect framework is restarted, it would not pull the data 
> from kafka and REST api remains inactive. Nothing is coming in the logs as 
> well.
> Checked the topics in kafka used by connect, everything has been reassigned 
> to another broker and has the leader available.
> 
> Has anyone encountered this issue ? what would be the expected behavior ?
> 
> Thanks in advance
> Kaushik


Re: [VOTE] KIP-401: TransformerSupplier/ProcessorSupplier StateStore connecting

2019-05-27 Thread Paul Whalen
I spoke too early a month ago, but I believe the proposal is finalized now
and ready for voting.

KIP:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756

Discussion:
https://lists.apache.org/thread.html/600996d83d485f2b8daf45037de64a60cebdfac9b234bf3449b6b753@%3Cdev.kafka.apache.org%3E

Pull request (still a WIP, obviously):
https://github.com/apache/kafka/pull/6824

Thanks,
Paul

On Wed, Apr 24, 2019 at 8:00 PM Paul Whalen  wrote:

> Hi all,
>
> After some good discussion on and adjustments to KIP-401 (which I renamed
> slightly for clarity), chatter has died down so I figured I may as well
> start a vote.
>
> KIP:
> TransformerSupplier/ProcessorSupplier StateStore connecting
> <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756>
> Discussion:
>
> https://lists.apache.org/thread.html/600996d83d485f2b8daf45037de64a60cebdfac9b234bf3449b6b753@%3Cdev.kafka.apache.org%3E
>
> Thanks!
> Paul
>


Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-05-27 Thread Paul Whalen
It wasn't much of a lift changing option B to work for option C, so I
closed that PR and made a new one, which should be identical to the KIP
right now: https://github.com/apache/kafka/pull/6824.  There are a few
todos still which I will hold off until the KIP is accepted.

I created a voting thread about a month ago, so I'll bump that now that
we're nearly there.

Paul

On Sun, May 26, 2019 at 2:21 PM Paul Whalen  wrote:

> Per Matthias's suggestion from a while ago, I actually implemented a good
> amount of option B to get a sense of the user experience and documentation
> requirements.  For a few reasons mentioned below, I think it's not my
> favorite option, and I prefer option C.  But since I did the work and it
> can help discussion, I may as well share:
> https://github.com/apache/kafka/pull/6821.
>
> Things I learned along the way implementing Option B:
>  - For the name of the interface, I like ConnectedStoreProvider.  It isn't
> perfect but it seems to capture the general gist without being overly
> verbose.  I get that from a strict standpoint it's not "providing connected
> stores" but is instead "providing stores to be connected," but I think that
> in context and with documentation, the risk of someone being confused by
> that is low.
>  - I definitely felt the discoverability issue while trying to write clear
> documentation; you really have to make sure to connect the dots for the
> user when the interface isn't connected to anything.
>  - Another problem with a separate interface found while writing
> tests/examples: defining a ProcessorSupplier that also implements
> ConnectedStoreProvider cannot be done anonymously, since you can't define
> an anonymous class in Java that implements multiple interfaces.  I actually
> consider this a fairly major usability issue - it means a user always has
> to have a custom class rather than doing it inline.  We could provide an
> abstract class that implements the two, but at that point, we're not that
> far from option A or C anyway.
>
> I updated the KIP with my current thinking, which as mentioned is
> Matthias's option C.  Once again for clarity, that *is not* what is in the
> linked pull request.  The current KIP is my proposal.
>
> Thanks everyone for the input!
>
> P.S.  What do folks use to edit the HTML documentation, e.g.
> processor-api.html?  I looked at doing it by hand it but it kind of looked
> like agony with all the small tags required for formatting code, so I'm
> sort of assuming there's tooling for it.
>
> On Fri, May 24, 2019 at 12:49 AM Matthias J. Sax 
> wrote:
>
>> I think the discussion mixed approaches a little bit, hence, let me
>> rephrase my understanding:
>>
>>
>> A) add new method with default implementation to `ProcessorSupplier`:
>>
>> For this case, we don't add a new interface, but only add a new method
>> to `ProcessorSupplier` -- to keep backward compatibility, we need to add
>> a default implementation. Users opt into the new feature by overwriting
>> the default implementation.
>>
>>
>> B) We add a new interface with new method:
>>
>> For this case, `ProcessorSupplier` interface is not changed and it does
>> also _not_ extend the new interface. Because `ProcessorSupplier` is not
>> changed, it's naturally backward compatible. Users opt into the new
>> feature, by adding the new interface to their ProcessorSupplier
>> implementation and they need to implement the new method because there
>> is no default implementation. Kafka Streams can use `instanceof` to
>> detect if the new interface is used or not and thus, to the right thing.
>>
>>
>> What was also discussed is a mix of both:
>>
>> C) We add a new interface with new method and let `ProcessorSupplier`
>> extend the new interface:
>>
>> Here, we need to add a default implementation to preserve backward
>> compatibility. Similar to (A), users opt into the feature by overwriting
>> the default implementation.
>>
>>
>>
>> Option (C) is the same as (A) from a user point of view because a user
>> won't care about the new interface. It only makes a difference for our
>> code base, as we can share the default implementation of the new method
>> This is only a small gain, as the implementation is trivial but also a
>> small drawback as we add new public interface that is useless to the
>> user because the user would never implement the interface directly.
>>
>>
>>
>> For (A/C), it might be simpler for users to detect the feature. For (B),
>> we have the advantage that users must impl

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-05-26 Thread Paul Whalen
 me atm what your final proposal is because you
> mentioned that you might want to rename `StateStoreConnector`? It's also
> unclear to me atm, if you prefer (A), (B), or (C).
>
> Maybe you can update the KIP if necessary and clearly state what you
> final proposal is. Beside this, it seems we can move to a VOTE?
>
>
>
> -Matthias
>
>
>
>
>
> On 5/2/19 3:01 PM, Bruno Cadonna wrote:
> > Hi Paul,
> >
> > I will try to express myself a bit clearer.
> >
> > Ad 1)
> > My assumption is that if `StateStoreConnector#stateStores()` returns
> `null`
> > Kafka Streams will throw an NPE because on purpose no null check is
> > performed before the loop that calls `StreamsBuilder#addStateStore()`.
> When
> > the user finally understands the cause of the NPE, she knows that she has
> > to override `StateStoreConnector#stateStores()` in her implementation. My
> > question was, why let the user discover that she has to overwrite the
> > method at runtime if you could not provide a default implementation for
> > `StateStoreConnector#stateStores()` and let the compiler tell the user
> the
> > need to overwrite the method. Not providing a default implementation
> > without separating the interfaces implies not being backward-compatible.
> > That means, if we choose to not provide a default implementation and let
> > the compiler signal the necessity to override the method, we have to
> > separate the interfaces in any case.
> >
> > Ad 2)
> > If you check for `null` or empty list in `process` and do not call
> > `addStateStores` in those cases, the advantage of returning `null` to be
> > saver to detect bugs as mentioned by Matthias would be lost. But maybe I
> am
> > missing something here.
> >
> > Best,
> > Bruno
> >
> >
> >
> > On Wed, May 1, 2019 at 6:27 AM Paul Whalen  wrote:
> >
> >> I definitely don't mind anyone jumping, Bruno, thanks for the comments!
> >>
> >> 1) I'm not totally sure I'm clear on your point, but I think we're on
> the
> >> same page - if we're adding a method to the XSupplier interfaces (by
> making
> >> them inherit from a super interface StateStoreConnector) then we
> definitely
> >> need a default implementation to maintain compatibility.  Whether the
> >> default implementation returns null or an empty list is somewhat of a
> >> detail.
> >>
> >> 2) If stream.process() sees that StateStoreConnector#stateStores()
> returns
> >> either null or an empty list, it would handle that case specifically and
> >> not try to call addStateStore at all.  Or is this not what you're
> asking?
> >>
> >> Separately, I'm still hacking away at the details of the PR and will
> >> continue to get something into a discussable state, but I'll share some
> >> thoughts I've run into.
> >>
> >> A) I'm tentatively going the separate interface route (Matthias's
> >> suggestion) and naming it ConnectedStoreProvider.  Still don't love the
> >> name, but there's something nice about the name indicating *why* this
> thing
> >> is providing the store, not just that it is providing it.
> >>
> >> B) It has occurred to me that topology.addProcessor() could also
> recognize
> >> if ProcessorSupplier implements ConnectedStoreProvider and add and
> connect
> >> stores appropriately.  This isn't in the KIP and I think the value-add
> is
> >> lower (if you're reaching that low level, surely the "auto add/connect
> >> store" isn't too important to you), but I think it would be a confusing
> if
> >> it didn't, and I don't see any real downside.
> >>
> >> Paul
> >>
> >> On Tue, Apr 30, 2019 at 4:18 AM Bruno Cadonna 
> wrote:
> >>
> >>> Hi,
> >>>
> >>> @Paul: Thank you for the KIP!
> >>>
> >>> I hope you do not mind that I jump in.
> >>>
> >>> I have the following comments:
> >>>
> >>> 1) `null` vs empty list in the default implementation
> >>> IIUC, returning `null` in the default implementation should basically
> >>> signal that the method `stateStores` was not overridden. Why then
> >> provide a
> >>> default implementation in the first place? Without default
> implementation
> >>> you would discover the missing implementation already at compile-time
> and
> >>> not only at runtime. If you decide

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-05-02 Thread Paul Whalen
Ivan, I’ll definitely forfeit my point on the clumsiness of the 
branch(predicate, consumer) solution, I don’t see any real drawbacks for the 
dynamic case. 

IMO the one trade off to consider at this point is the scope question. I don’t 
know if I totally agree that “we rarely need them in the same scope” since 
merging the branches back together later seems like a perfectly plausible use 
case that can be a lot nicer when the branched streams are in the same scope. 
That being said, for the reasons Ivan listed, I think it is overall the better 
solution - working around the scope thing is easy enough if you need to. 

> On May 2, 2019, at 7:00 PM, Ivan Ponomarev  wrote:
> 
> Hello everyone, thank you all for joining the discussion!
> 
> Well, I don't think the idea of named branches, be it a LinkedHashMap (no 
> other Map will do, because order of definition matters) or `branch` method  
> taking name and Consumer has more advantages than drawbacks.
> 
> In my opinion, the only real positive outcome from Michael's proposal is that 
> all the returned branches are in the same scope. But 1) we rarely need them 
> in the same scope 2) there is a workaround for the scope problem, described 
> in the KIP.
> 
> 'Inlining the complex logic' is not a problem, because we can use method 
> references instead of lambdas. In real world scenarios you tend to split the 
> complex logic to methods anyway, so the code is going to be clean.
> 
> The drawbacks are strong. The cohesion between predicates and handlers is 
> lost. We have to define predicates in one place, and handlers in another. 
> This opens the door for bugs:
> 
> - what if we forget to define a handler for a name? or a name for a handler?
> - what if we misspell a name?
> - what if we copy-paste and duplicate a name?
> 
> What Michael propose would have been totally OK if we had been writing the 
> API in Lua, Ruby or Python. In those languages the "dynamic naming" approach 
> would have looked most concise and beautiful. But in Java we expect all the 
> problems related to identifiers to be eliminated in compile time.
> 
> Do we have to invent duck-typing for the Java API?
> 
> And if we do, what advantage are we supposed to get besides having all the 
> branches in the same scope? Michael, maybe I'm missing your point?
> 
> ---
> 
> Earlier in this discussion John Roesler also proposed to do without "start 
> branching" operator, and later Paul mentioned that in the case when we have 
> to add a dynamic number of branches, the current KIP is 'clumsier' compared 
> to Michael's 'Map' solution. Let me address both comments here.
> 
> 1) "Start branching" operator (I think that *split* is a good name for it 
> indeed) is critical when we need to do a dynamic branching, see example below.
> 
> 2) No, dynamic branching in current KIP is not clumsy at all. Imagine a 
> real-world scenario when you need one branch per enum value (say, 
> RecordType). You can have something like this:
> 
> /*John:if we had to start with stream.branch(...) here, it would have been 
> much messier.*/
> KBranchedStream branched = stream.split();
> 
> /*Not clumsy at all :-)*/
> for (RecordType recordType : RecordType.values())
> branched = branched.branch((k, v) -> v.getRecType() == recordType,
> recordType::processRecords);
> 
> Regards,
> 
> Ivan
> 
> 
> 02.05.2019 14:40, Matthias J. Sax пишет:
>> I also agree with Michael's observation about the core problem of
>> current `branch()` implementation.
>> 
>> However, I also don't like to pass in a clumsy Map object. My thinking
>> was more aligned with Paul's proposal to just add a name to each
>> `branch()` statement and return a `Map`.
>> 
>> It makes the code easier to read, and also make the order of
>> `Predicates` (that is essential) easier to grasp.
>> 
>>>>>> Map> branches = stream.split()
>>>>>>.branch("branchOne", Predicate)
>>>>>>.branch( "branchTwo", Predicate)
>>>>>>.defaultBranch("defaultBranch");
>> An open question is the case for which no defaultBranch() should be
>> specified. Atm, `split()` and `branch()` would return `BranchedKStream`
>> and the call to `defaultBranch()` that returns the `Map` is mandatory
>> (what is not the case atm). Or is this actually not a real problem,
>> because users can just ignore the branch returned by `defaultBranch()`
>> in the result `Map` ?
>> 
>> 
>> About "inlining": So far, it seems to be a matter of personal
>&g

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-05-01 Thread Paul Whalen
Perhaps inlining is the wrong terminology. It doesn’t require that a lambda 
with the full downstream topology be defined inline - it can be a method 
reference as with Ivan’s original suggestion.  The advantage of putting the 
predicate and its downstream logic (Consumer) together in branch() is that they 
are required to be near to each other. 

Ultimately the downstream code has to live somewhere, and deep branch trees 
will be hard to read regardless.

> On May 1, 2019, at 1:07 PM, Michael Drogalis  
> wrote:
> 
> I'm less enthusiastic about inlining the branch logic with its downstream
> functionality. Programs that have deep branch trees will quickly become
> harder to read as a single unit.
> 
>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen  wrote:
>> 
>> Also +1 on the issues/goals as Michael outlined them, I think that sets a
>> great framework for the discussion.
>> 
>> Regarding the SortedMap solution, my understanding is that the current
>> proposal in the KIP is what is in my PR which (pending naming decisions) is
>> roughly this:
>> 
>> stream.split()
>>.branch(Predicate, Consumer>)
>>.branch(Predicate, Consumer>)
>>.defaultBranch(Consumer>);
>> 
>> Obviously some ordering is necessary, since branching as a construct
>> doesn't work without it, but this solution seems like it provides as much
>> associativity as the SortedMap solution, because each branch() call
>> directly associates the "conditional" with the "code block."  The value it
>> provides over the KIP solution is the accessing of streams in the same
>> scope.
>> 
>> The KIP solution is less "dynamic" than the SortedMap solution in the sense
>> that it is slightly clumsier to add a dynamic number of branches, but it is
>> certainly possible.  It seems to me like the API should favor the "static"
>> case anyway, and should make it simple and readable to fluently declare and
>> access your branches in-line.  It also makes it impossible to ignore a
>> branch, and it is possible to build an (almost) identical SortedMap
>> solution on top of it.
>> 
>> I could also see a middle ground where instead of a raw SortedMap being
>> taken in, branch() takes a name and not a Consumer.  Something like this:
>> 
>> Map> branches = stream.split()
>>.branch("branchOne", Predicate)
>>.branch( "branchTwo", Predicate)
>>.defaultBranch("defaultBranch", Consumer>);
>> 
>> Pros for that solution:
>> - accessing branched KStreams in same scope
>> - no double brace initialization, hopefully slightly more readable than
>> SortedMap
>> 
>> Cons
>> - downstream branch logic cannot be specified inline which makes it harder
>> to read top to bottom (like existing API and SortedMap, but unlike the KIP)
>> - you can forget to "handle" one of the branched streams (like existing
>> API and SortedMap, but unlike the KIP)
>> 
>> (KBranchedStreams could even work *both* ways but perhaps that's overdoing
>> it).
>> 
>> Overall I'm curious how important it is to be able to easily access the
>> branched KStream in the same scope as the original.  It's possible that it
>> doesn't need to be handled directly by the API, but instead left up to the
>> user.  I'm sort of in the middle on it.
>> 
>> Paul
>> 
>> 
>> 
>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman 
>> wrote:
>> 
>>> I'd like to +1 what Michael said about the issues with the existing
>> branch
>>> method, I agree with what he's outlined and I think we should proceed by
>>> trying to alleviate these problems. Specifically it seems important to be
>>> able to cleanly access the individual branches (eg by mapping
>>> name->stream), which I thought was the original intention of this KIP.
>>> 
>>> That said, I don't think we should so easily give in to the double brace
>>> anti-pattern or force ours users into it if at all possible to
>> avoid...just
>>> my two cents.
>>> 
>>> Cheers,
>>> Sophie
>>> 
>>> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
>>> michael.droga...@confluent.io> wrote:
>>> 
>>>> I’d like to propose a different way of thinking about this. To me,
>> there
>>>> are three problems with the existing branch signature:
>>>> 
>>>> 1. If you use it the way most people do, Java raises unsafe type
>>> warnings.

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-04-30 Thread Paul Whalen
I definitely don't mind anyone jumping, Bruno, thanks for the comments!

1) I'm not totally sure I'm clear on your point, but I think we're on the
same page - if we're adding a method to the XSupplier interfaces (by making
them inherit from a super interface StateStoreConnector) then we definitely
need a default implementation to maintain compatibility.  Whether the
default implementation returns null or an empty list is somewhat of a
detail.

2) If stream.process() sees that StateStoreConnector#stateStores() returns
either null or an empty list, it would handle that case specifically and
not try to call addStateStore at all.  Or is this not what you're asking?

Separately, I'm still hacking away at the details of the PR and will
continue to get something into a discussable state, but I'll share some
thoughts I've run into.

A) I'm tentatively going the separate interface route (Matthias's
suggestion) and naming it ConnectedStoreProvider.  Still don't love the
name, but there's something nice about the name indicating *why* this thing
is providing the store, not just that it is providing it.

B) It has occurred to me that topology.addProcessor() could also recognize
if ProcessorSupplier implements ConnectedStoreProvider and add and connect
stores appropriately.  This isn't in the KIP and I think the value-add is
lower (if you're reaching that low level, surely the "auto add/connect
store" isn't too important to you), but I think it would be a confusing if
it didn't, and I don't see any real downside.

Paul

On Tue, Apr 30, 2019 at 4:18 AM Bruno Cadonna  wrote:

> Hi,
>
> @Paul: Thank you for the KIP!
>
> I hope you do not mind that I jump in.
>
> I have the following comments:
>
> 1) `null` vs empty list in the default implementation
> IIUC, returning `null` in the default implementation should basically
> signal that the method `stateStores` was not overridden. Why then provide a
> default implementation in the first place? Without default implementation
> you would discover the missing implementation already at compile-time and
> not only at runtime. If you decide not to provide a default implementation,
> `XSupplier extends StateStoreConnector` would break existing code as
> Matthias has already pointed out.
>
> 2) `process` method adding the StoreBuilders to the topology
> If the default implementation returned `null` and `XSupplier extends
> StateStoreConnector`, then existing code would break, because
> `StreamsBuilder#addStateStore()` would throw a NPE.
>
> +1 for opening a WIP PR
>
> Best,
> Bruno
>
>
> On Sun, Apr 28, 2019 at 10:57 PM Matthias J. Sax 
> wrote:
>
> > Thank Paul!
> >
> > I agree with all of that. If we think that the general design is good,
> > refactoring a PR if we want to pick a different name should not be too
> > much additional work (hopefully). Thus, if you want to open a WIP PR and
> > we use it to nail the open details, it might help to find a good
> > conclusion.
> >
> > >> 2) Default method vs new interface:
> >
> > This seems to be the hardest tradeoff. I see the point about
> > discoveability... Might be good to get input from others, which version
> > they would prefer.
> >
> > Just to make clear, my suggestion from the last email was, that
> > `Transformer` etc does not extend the new interface. Instead, a user
> > that want to use this feature would need to implement both interfaces.
> >
> > If `Transformer extends StoreProvider` (just picking a name here)
> > without default implementation existing code would break and thus it not
> > a an option because of breaking backward compatibility.
> >
> >
> > -Matthias
> >
> > On 4/28/19 8:37 PM, Paul Whalen wrote:
> > > Great thoughts Matthias, thanks! I think we're all agreed that naming
> and
> > > documentation/education are the biggest hurdles for this KIP, and in
> > light
> > > of that, I think it makes sense for me to just take a stab at a full
> > > fledged PR with documentation to convince us that it's possible to do
> it
> > > with enough clarity.
> > >
> > > In response to your specific thoughts:
> > >
> > > 1) StateStoreConnector as a name: Really good point about defining the
> > > difference between "adding" and "connecting."  Guozhang suggested
> > > StateStoreConnector which was definitely an improvement over my
> > > StateStoresSupplier, but I think you're right that we need to be
> careful
> > to
> > > make it clear that it's really accomplishing both.  Thinking about it
> 

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-04-30 Thread Paul Whalen
)
> > > > >
> > > > > I can see your point: this is to make the name similar to
> > String#split
> > > > > that also returns an array, right? But is it worth the loss of
> > > backwards
> > > > > compatibility? We can have overloaded branch() as well without
> > > affecting
> > > > > the existing code. Maybe the old array-based `branch` method should
> > be
> > > > > deprecated, but this is a subject for discussion.
> > > > >
> > > > >> Renaming KBranchedStream#addBranch() -> BranchingKStream#branch(),
> > > > > KBranchedStream#defaultBranch() -> BranchingKStream#default()
> > > > >
> > > > > Totally agree with 'addBranch->branch' rename. 'default' is,
> > however, a
> > > > > reserved word, so unfortunately we cannot have a method with such
> > name
> > > > :-)
> > > > >
> > > > >> defaultBranch() does take an `Predicate` as argument, but I think
> > that
> > > > > is not required?
> > > > >
> > > > > Absolutely! I think that was just copy-paste error or something.
> > > > >
> > > > > Dear colleagues,
> > > > >
> > > > > please revise the new version of the KIP and Paul's PR
> > > > > (https://github.com/apache/kafka/pull/6512)
> > > > >
> > > > > Any new suggestions/objections?
> > > > >
> > > > > Regards,
> > > > >
> > > > > Ivan
> > > > >
> > > > >
> > > > > 11.04.2019 11:47, Matthias J. Sax пишет:
> > > > >> Thanks for driving the discussion of this KIP. It seems that
> > everybody
> > > > >> agrees that the current branch() method using arrays is not
> optimal.
> > > > >>
> > > > >> I had a quick look into the PR and I like the overall proposal.
> > There
> > > > >> are some minor things we need to consider. I would recommend the
> > > > >> following renaming:
> > > > >>
> > > > >> KStream#branch() -> #split()
> > > > >> KBranchedStream#addBranch() -> BranchingKStream#branch()
> > > > >> KBranchedStream#defaultBranch() -> BranchingKStream#default()
> > > > >>
> > > > >> It's just a suggestion to get slightly shorter method names.
> > > > >>
> > > > >> In the current PR, defaultBranch() does take an `Predicate` as
> > > argument,
> > > > >> but I think that is not required?
> > > > >>
> > > > >> Also, we should consider KIP-307, that was recently accepted and
> is
> > > > >> currently implemented:
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> > > > >>
> > > > >> Ie, we should add overloads that accepted a `Named` parameter.
> > > > >>
> > > > >>
> > > > >> For the issue that the created `KStream` object are in different
> > > scopes:
> > > > >> could we extend `KBranchedStream` with a `get(int index)` method
> > that
> > > > >> returns the corresponding "branched" result `KStream` object?
> Maybe,
> > > the
> > > > >> second argument of `addBranch()` should not be a
> `Consumer`
> > > but
> > > > >> a `Function` and `get()` could return whatever
> the
> > > > >> `Function` returns?
> > > > >>
> > > > >>
> > > > >> Finally, I would also suggest to update the KIP with the current
> > > > >> proposal. That makes it easier to review.
> > > > >>
> > > > >>
> > > > >> -Matthias
> > > > >>
> > > > >>
> > > > >>
> > > > >> On 3/31/19 12:22 PM, Paul Whalen wrote:
> > > > >>> Ivan,
> > > > >>>
> > > > >>> I'm a bit of a novice here as well, but I think it makes sense
> for
> > > you
> > > > to
> > > > >>> revise the KIP and continue the discussion.  Obviously we'll need
> > > some
> > > 

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-04-28 Thread Paul Whalen
he
> feature. I am wonder if it might be better to not add the new interface
> to `ProcessorSupplier` etc and to just provide a new interface with no
> default implementation. Users would opt-in by adding the interface
> explicitly to their existing `ProcessorSupplier` implementation.
> Overwriting a default method and getting different behavior seems to be
> a little subtle to me, especially, because we don't want to allow to
> mix-and-match the old and new approaches. Think: I only overwrite a
> default method and my code breaks.
>
> Thoughts?
>
>
>
> (3) If we keep the current default implementation for the new method, I
> am wondering if it should return `null` instead of an empty collection?
> This might be saver to detect bugs in user code for which, per accident,
> an empty collection could be returned.
>
>
>
> (4) Should the new method return a `Set` instead of a `Collection` to
> indicate the semantics clearly (ie, returning the same `StoreBuilder`
> multiple times is idempotent and one cannot add+connect to it twice).
>
>
>
> -Matthias
>
>
>
>
> On 4/6/19 12:27 PM, Paul Whalen wrote:
> > Ivan and Guozhang,
> >
> > Thanks for the thoughts!  Ivan's use case is definitely interesting.  The
> > way I see it, if we can achieve the main goal of the KIP (allowing
> > Processor/TransformerSuppliers to encapsulate their usage of state
> stores),
> > we will enable this kind of thing in "user space" very easily.
> >
> > I will say that I'm not totally sure that most use cases of transform()
> use
> > just one state store.  It's hard to know since I haven't seen many
> examples
> > in public, but my team's usages almost exclusively require multiple state
> > stores.  We only reach for the low level processor API when we need that
> > complexity, and it's somewhat hard to imagine many use cases that only
> need
> > one state store, since the high level DSL can usually accomplish those
> > tasks.  The example Ivan presented for instance looks like a
> > stream.groupByKey().reduce(...) to me.  Ivan, I'd be curious what sort of
> > other usages you're imagining.
> >
> > That being said, perhaps the Processor API should really just be
> considered
> > a separate paradigm in Streams, not just a lower level that we reach to
> > when necessary.  In which case it would be beneficial to make the simple
> > use cases easier.  I've definitely talked about this with my own team -
> if
> > you're less familiar with the kind of functional style that the high
> level
> > DSL offers, it might be easier to "see" your state and interact with it
> > directly.
> >
> > Anyway, I've updated the KIP to reflect my current PR with Guozhang's
> > suggestions.  It seems like there is at least some interest in that on
> its
> > own and not a ton of pushback, so I think I will try to start a vote.
> >
> > Paul
> >
> > On Sat, Mar 30, 2019 at 10:03 AM Ivan Ponomarev 
> wrote:
> >
> >> Hi all!
> >>
> >> I was about to write another KIP, but found out that KIP-401 addresses
> >> exactly the problem I faced. So let me jump into your discussion and ask
> >> you to assess another idea.
> >>
> >> I fully agree with the KIP-401's motivation part. E. g in my project I
> had
> >> to invent a wrapper class that hides the details of KeyValueStore
> >> management from business logic. Of course this should be done better in
> >> KStreams API.
> >>
> >> But I was about to look at this problem from another side and propose a
> >> simple alternative in high-level DSL, that will not fit all the cases,
> but
> >> most of them. Hence my idea does not exclude the Paul's proposal.
> >>
> >> What if we restrict ourselves to *only one* KeyValueStore and propose a
> >> method that resembles  `aggregate` and `reduce` methods, like this:
> >>
> >> stream
> >>.map(...)
> >>.filter(...)
> >>.transform ((k, v, s)->{}, Transformed.with())
> >>
> >> where
> >> * k, v -- input key & value
> >> * s -- a KeyValueStore provided as an argument
> >> * return value of the lambda should be KeyValue.pair(...)
> >> * Transformed.with... is a builder, used in order to define the
> >> Transformer and KeyValueStore building parameters. Some of these
> parameters
> >> should be:
> >> ** store's KeySerde,
> >> ** store's ValueSerde,
> >> ** 

[VOTE] KIP-401: TransformerSupplier/ProcessorSupplier StateStore connecting

2019-04-24 Thread Paul Whalen
Hi all,

After some good discussion on and adjustments to KIP-401 (which I renamed
slightly for clarity), chatter has died down so I figured I may as well
start a vote.

KIP:
TransformerSupplier/ProcessorSupplier StateStore connecting

Discussion:
https://lists.apache.org/thread.html/600996d83d485f2b8daf45037de64a60cebdfac9b234bf3449b6b753@%3Cdev.kafka.apache.org%3E

Thanks!
Paul


Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-04-06 Thread Paul Whalen
Ivan and Guozhang,

Thanks for the thoughts!  Ivan's use case is definitely interesting.  The
way I see it, if we can achieve the main goal of the KIP (allowing
Processor/TransformerSuppliers to encapsulate their usage of state stores),
we will enable this kind of thing in "user space" very easily.

I will say that I'm not totally sure that most use cases of transform() use
just one state store.  It's hard to know since I haven't seen many examples
in public, but my team's usages almost exclusively require multiple state
stores.  We only reach for the low level processor API when we need that
complexity, and it's somewhat hard to imagine many use cases that only need
one state store, since the high level DSL can usually accomplish those
tasks.  The example Ivan presented for instance looks like a
stream.groupByKey().reduce(...) to me.  Ivan, I'd be curious what sort of
other usages you're imagining.

That being said, perhaps the Processor API should really just be considered
a separate paradigm in Streams, not just a lower level that we reach to
when necessary.  In which case it would be beneficial to make the simple
use cases easier.  I've definitely talked about this with my own team - if
you're less familiar with the kind of functional style that the high level
DSL offers, it might be easier to "see" your state and interact with it
directly.

Anyway, I've updated the KIP to reflect my current PR with Guozhang's
suggestions.  It seems like there is at least some interest in that on its
own and not a ton of pushback, so I think I will try to start a vote.

Paul

On Sat, Mar 30, 2019 at 10:03 AM Ivan Ponomarev  wrote:

> Hi all!
>
> I was about to write another KIP, but found out that KIP-401 addresses
> exactly the problem I faced. So let me jump into your discussion and ask
> you to assess another idea.
>
> I fully agree with the KIP-401's motivation part. E. g in my project I had
> to invent a wrapper class that hides the details of KeyValueStore
> management from business logic. Of course this should be done better in
> KStreams API.
>
> But I was about to look at this problem from another side and propose a
> simple alternative in high-level DSL, that will not fit all the cases, but
> most of them. Hence my idea does not exclude the Paul's proposal.
>
> What if we restrict ourselves to *only one* KeyValueStore and propose a
> method that resembles  `aggregate` and `reduce` methods, like this:
>
> stream
>.map(...)
>.filter(...)
>.transform ((k, v, s)->{}, Transformed.with())
>
> where
> * k, v -- input key & value
> * s -- a KeyValueStore provided as an argument
> * return value of the lambda should be KeyValue.pair(...)
> * Transformed.with... is a builder, used in order to define the
> Transformer and KeyValueStore building parameters. Some of these parameters
> should be:
> ** store's KeySerde,
> ** store's ValueSerde,
> ** whether the store is persistent or in-memory,
> ** store's name -- optional parameter, the system should be able to devise
> the name of the store transparently for the user, if we don't want to
> devise it ourselves/share the store between processors.
> ** scheduled punctuation.
>
> Imagine we have a KStream, and we need to calculate a
> `derivative` stream, that is, a stream of 'deltas' of the provided integer
> values.
>
> This could be achieved as simple as
>
> stream.transform((key, value, stateStore) -> {
> int previousValue =
> Optional.ofNullable(stateStore.get(key)).orElse(0);
> stateStore.put(key, value);
> return KeyValue.pair(key, value - previousValue);
> }
> //we do not need to bother with store name, punctuation etc.
> //may be even Serde part can be omitted, since we can inherit the
> serdes from stream by default
> , Transformed.with(Serdes.String(), Serdes.Integer())
> }
>
> The hard part of it is that new `transform` method definition should be
> parameterized by six type parameters:
>
> * input/output/KeyValueStore key type,
> * input/output/KeyValueStore value type.
>
> However, it seems that all these types can be inferred from the provided
> lambda and Transformed.with instances.
>
> What do you think about this?
>
> Regards,
>
> Ivan
>
>
> 27.03.2019 20:45, Guozhang Wang пишет:
>
> Hello Paul,
>
> Thanks for the uploaded PR and the detailed description! I've made a pass
> on it and left some comments.
>
> Overall I think I agree with you that passing in the storebuilder directly
> that store name is more convienent as it does not require another
> `addStore` call, but we just need to spend some more docu

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-31 Thread Paul Whalen
Ivan,

I'm a bit of a novice here as well, but I think it makes sense for you to
revise the KIP and continue the discussion.  Obviously we'll need some
buy-in from committers that have actual binding votes on whether the KIP
could be adopted.  It would be great to hear if they think this is a good
idea overall.  I'm not sure if that happens just by starting a vote, or if
there is generally some indication of interest beforehand.

That being said, I'll continue the discussion a bit: assuming we do move
forward the solution of "stream.branch() returns KBranchedStream", do we
deprecate "stream.branch(...) returns KStream[]"?  I would favor
deprecating, since having two mutually exclusive APIs that accomplish the
same thing is confusing, especially when they're fairly similar anyway.  We
just need to be sure we're not making something impossible/difficult that
is currently possible/easy.

Regarding my PR - I think the general structure would work, it's just a
little sloppy overall in terms of naming and clarity. In particular,
passing in the "predicates" and "children" lists which get modified in
KBranchedStream but read from all the way KStreamLazyBranch is a bit
complicated to follow.

Thanks,
Paul

On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev  wrote:

> Hi Paul!
>
> I read your code carefully and now I am fully convinced: your proposal
> looks better and should work. We just have to document the crucial fact
> that KStream consumers are invoked as they're added. And then it's all
> going to be very nice.
>
> What shall we do now? I should re-write the KIP and resume the
> discussion here, right?
>
> Why are you telling that your PR 'should not be even a starting point if
> we go in this direction'? To me it looks like a good starting point. But
> as a novice in this project I might miss some important details.
>
> Regards,
>
> Ivan
>
>
> 28.03.2019 17:38, Paul Whalen пишет:
> > Ivan,
> >
> > Maybe I’m missing the point, but I believe the stream.branch() solution
> supports this. The couponIssuer::set* consumers will be invoked as they’re
> added, not during streamsBuilder.build(). So the user still ought to be
> able to call couponIssuer.coupons() afterward and depend on the branched
> streams having been set.
> >
> > The issue I mean to point out is that it is hard to access the branched
> streams in the same scope as the original stream (that is, not inside the
> couponIssuer), which is a problem with both proposed solutions. It can be
> worked around though.
> >
> > [Also, great to hear additional interest in 401, I’m excited to hear
> your thoughts!]
> >
> > Paul
> >
> >> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev  wrote:
> >>
> >> Hi Paul!
> >>
> >> The idea to postpone the wiring of branches to the
> streamsBuilder.build() also looked great for me at first glance, but ---
> >>
> >>> the newly branched streams are not available in the same scope as each
> other.  That is, if we wanted to merge them back together again I don't see
> a way to do that.
> >>
> >> You just took the words right out of my mouth, I was just going to
> write in details about this issue.
> >>
> >> Consider the example from Bill's book, p. 101: say we need to identify
> customers who have bought coffee and made a purchase in the electronics
> store to give them coupons.
> >>
> >> This is the code I usually write under these circumstances using my
> 'brancher' class:
> >>
> >> @Setter
> >> class CouponIssuer{
> >>private KStream<> coffePurchases;
> >>private KStream<> electronicsPurchases;
> >>
> >>KStream<...> coupons(){
> >>return coffePurchases.join(electronicsPurchases...)...whatever
> >>
> >>/*In the real world the code here can be complex, so creation of
> a separate CouponIssuer class is fully justified, in order to separate
> classes' responsibilities.*/
> >>
> >>   }
> >> }
> >>
> >> CouponIssuer couponIssuer = new CouponIssuer();
> >>
> >> new KafkaStreamsBrancher<>()
> >>  .branch(predicate1, couponIssuer::setCoffePurchases)
> >>  .branch(predicate2, couponIssuer::setElectronicsPurchases)
> >>  .onTopOf(transactionStream);
> >>
> >> /*Alas, this won't work if we're going to wire up everything later,
> without the terminal operation!!!*/
> >> couponIssuer.coupons()...
> >>
> >> Does this make sense?  In o

[jira] [Created] (KAFKA-8177) Allow for separate connect instances to have sink connectors with the same name

2019-03-30 Thread Paul Whalen (JIRA)
Paul Whalen created KAFKA-8177:
--

 Summary: Allow for separate connect instances to have sink 
connectors with the same name
 Key: KAFKA-8177
 URL: https://issues.apache.org/jira/browse/KAFKA-8177
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Paul Whalen


If you have multiple Connect instances (either a single standalone or 
distributed group of workers) running against the same Kafka cluster, the 
connect instances cannot each have a sink connector with the same name and 
still operate independently. This is because the consumer group ID used 
internally for reading from the source topic(s) is entirely derived from the 
connector's name: 
[https://github.com/apache/kafka/blob/d0e436c471ba4122ddcc0f7a1624546f97c4a517/connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java#L24]

The documentation of Connect implies to me that it supports "multi-tenancy," 
that is, as long as...
 * In standalone mode, the {{offset.storage.file.filename}} is not shared 
between instances
 * In distributed mode, {{group.id}} and {{config.storage.topic}}, 
{{offset.storage.topic}}, and {{status.storage.topic}} are not the same between 
instances

... then the connect instances can operate completely independently without 
fear of conflict.  But the sink connector consumer group naming policy makes 
this untrue. Obviously this can be achieved by uniquely naming connectors 
across instances, but in some environments that could be a bit of a nuisance, 
or a challenging policy to enforce. For instance, imagine a large group of 
developers or data analysts all running their own standalone Connect to load 
into a SQL database for their own analysis, or replicating to mirroring to 
their own local cluster for testing.

The obvious solution is allow supplying config that gives a Connect instance 
some notion of identity, and to use that when creating the sink task consumer 
group. Distributed mode already has this obviously ({{group.id}}), but it would 
need to be added for standalone mode. Maybe {{instance.id}}? Given that 
solution it seems like this would need a small KIP.

I could also imagine this solving this problem through better documentation 
("ensure your connector names are unique!"), but having that subtlety doesn't 
seem worth it to me. (Optionally) assigning identity to every Connect instance 
seems strictly more clear, without any downside.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-28 Thread Paul Whalen
Ivan,

Maybe I’m missing the point, but I believe the stream.branch() solution 
supports this. The couponIssuer::set* consumers will be invoked as they’re 
added, not during streamsBuilder.build(). So the user still ought to be able to 
call couponIssuer.coupons() afterward and depend on the branched streams having 
been set.

The issue I mean to point out is that it is hard to access the branched streams 
in the same scope as the original stream (that is, not inside the 
couponIssuer), which is a problem with both proposed solutions. It can be 
worked around though. 

[Also, great to hear additional interest in 401, I’m excited to hear your 
thoughts!]

Paul

> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev  wrote:
> 
> Hi Paul!
> 
> The idea to postpone the wiring of branches to the streamsBuilder.build() 
> also looked great for me at first glance, but ---
> 
> > the newly branched streams are not available in the same scope as each 
> > other.  That is, if we wanted to merge them back together again I don't see 
> > a way to do that.
> 
> You just took the words right out of my mouth, I was just going to write in 
> details about this issue.
> 
> Consider the example from Bill's book, p. 101: say we need to identify 
> customers who have bought coffee and made a purchase in the electronics store 
> to give them coupons.
> 
> This is the code I usually write under these circumstances using my 
> 'brancher' class:
> 
> @Setter
> class CouponIssuer{
>   private KStream<> coffePurchases;
>   private KStream<> electronicsPurchases;
> 
>   KStream<...> coupons(){
>   return coffePurchases.join(electronicsPurchases...)...whatever
> 
>   /*In the real world the code here can be complex, so creation of a 
> separate CouponIssuer class is fully justified, in order to separate classes' 
> responsibilities.*/
> 
>  }
> }
> 
> CouponIssuer couponIssuer = new CouponIssuer();
> 
> new KafkaStreamsBrancher<>()
> .branch(predicate1, couponIssuer::setCoffePurchases)
> .branch(predicate2, couponIssuer::setElectronicsPurchases)
> .onTopOf(transactionStream);
> 
> /*Alas, this won't work if we're going to wire up everything later, without 
> the terminal operation!!!*/
> couponIssuer.coupons()...
> 
> Does this make sense?  In order to properly initialize the CouponIssuer we 
> need the terminal operation to be called before streamsBuilder.build() is 
> called.
> 
> 
> [BTW Paul, I just found out that your KIP-401 is essentially the next KIP I 
> was going to write here. I have some thoughts based on my experience, so I 
> will join the discussion on KIP-401 soon.]
> 
> Regards,
> 
> Ivan
> 
> 28.03.2019 6:29, Paul Whalen пишет:
>> Ivan,
>> I tried to make a very rough proof of concept of a fluent API based off of
>> KStream here (https://github.com/apache/kafka/pull/6512), and I think I
>> succeeded at removing both cons.
>>- Compatibility: I was incorrect earlier about compatibility issues,
>>there aren't any direct ones.  I was unaware that Java is smart enough to
>>distinguish between a branch(varargs...) returning one thing and branch()
>>with no arguments returning another thing.
>>- Requiring a terminal method: We don't actually need it.  We can just
>>build up the branches in the KBranchedStream who shares its state with the
>>ProcessorSupplier that will actually do the branching.  It's not terribly
>>pretty in its current form, but I think it demonstrates its feasibility.
>> To be clear, I don't think that pull request should be final or even a
>> starting point if we go in this direction, I just wanted to see how
>> challenging it would be to get the API working.
>> I will say though, that I'm not sure the existing solution could be
>> deprecated in favor of this, which I had originally suggested was a
>> possibility.  The reason is that the newly branched streams are not
>> available in the same scope as each other.  That is, if we wanted to merge
>> them back together again I don't see a way to do that.  The KIP proposal
>> has the same issue, though - all this means is that for either solution,
>> deprecating the existing branch(...) is not on the table.
>> Thanks,
>> Paul
>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev  wrote:
>>> OK, let me summarize what we have discussed up to this point.
>>> 
>>> First, it seems that it's commonly agreed that branch API needs
>>> improvement. Motivation is given in the KIP.
>>> 
>>> There are two potential ways to do it:

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-27 Thread Paul Whalen
Ivan,

I tried to make a very rough proof of concept of a fluent API based off of
KStream here (https://github.com/apache/kafka/pull/6512), and I think I
succeeded at removing both cons.

   - Compatibility: I was incorrect earlier about compatibility issues,
   there aren't any direct ones.  I was unaware that Java is smart enough to
   distinguish between a branch(varargs...) returning one thing and branch()
   with no arguments returning another thing.
   - Requiring a terminal method: We don't actually need it.  We can just
   build up the branches in the KBranchedStream who shares its state with the
   ProcessorSupplier that will actually do the branching.  It's not terribly
   pretty in its current form, but I think it demonstrates its feasibility.

To be clear, I don't think that pull request should be final or even a
starting point if we go in this direction, I just wanted to see how
challenging it would be to get the API working.

I will say though, that I'm not sure the existing solution could be
deprecated in favor of this, which I had originally suggested was a
possibility.  The reason is that the newly branched streams are not
available in the same scope as each other.  That is, if we wanted to merge
them back together again I don't see a way to do that.  The KIP proposal
has the same issue, though - all this means is that for either solution,
deprecating the existing branch(...) is not on the table.

Thanks,
Paul

On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev  wrote:

> OK, let me summarize what we have discussed up to this point.
>
> First, it seems that it's commonly agreed that branch API needs
> improvement. Motivation is given in the KIP.
>
> There are two potential ways to do it:
>
> 1. (as origianlly proposed)
>
> new KafkaStreamsBrancher<..>()
>.branch(predicate1, ks ->..)
>.branch(predicate2, ks->..)
>.defaultBranch(ks->..) //optional
>.onTopOf(stream).mapValues(...) //onTopOf returns its argument
>
> PROS: 1) Fully backwards compatible. 2) The code won't make sense until
> all the necessary ingredients are provided.
>
> CONS: The need to create a KafkaStreamsBrancher instance contrasts the
> fluency of other KStream methods.
>
> 2. (as Paul proposes)
>
> stream
>.branch(predicate1, ks ->...)
>.branch(predicate2, ks->...)
>.defaultBranch(ks->...) //or noDefault(). Both defaultBranch(..) and
> noDefault() return void
>
> PROS: Generally follows the way KStreams interface is defined.
>
> CONS: We need to define two terminal methods (defaultBranch(ks->) and
> noDefault()). And for a user it is very easy to miss the fact that one
> of the terminal methods should be called. If these methods are not
> called, we can throw an exception in runtime.
>
> Colleagues, what are your thoughts? Can we do better?
>
> Regards,
>
> Ivan
>
> 27.03.2019 18:46, Ivan Ponomarev пишет:
> >
> >
> > 25.03.2019 17:43, Ivan Ponomarev пишет:
> >> Paul,
> >>
> >> I see your point when you are talking about
> >> stream..branch..branch...default..
> >>
> >> Still, I believe that this cannot not be implemented the easy way.
> >> Maybe we all should think further.
> >>
> >> Let me comment on two of your ideas.
> >>
> >>> user could specify a terminal method that assumes nothing will reach
> >>> the default branch,
> >> throwing an exception if such a case occurs.
> >>
> >> 1) OK, apparently this should not be the only option besides
> >> `default`, because there are scenarios when we want to just silently
> >> drop the messages that didn't match any predicate. 2) Throwing an
> >> exception in the middle of data flow processing looks like a bad idea.
> >> In stream processing paradigm, I would prefer to emit a special
> >> message to a dedicated stream. This is exactly where `default` can be
> >> used.
> >>
> >>> it would be fairly easily for the InternalTopologyBuilder to track
> >>> dangling
> >> branches that haven't been terminated and raise a clear error before it
> >> becomes an issue.
> >>
> >> You mean a runtime exception, when the program is compiled and run?
> >> Well,  I'd prefer an API that simply won't compile if used
> >> incorrectly. Can we build such an API as a method chain starting from
> >> KStream object? There is a huge cost difference between runtime and
> >> compile-time errors. Even if a failure uncovers instantly on unit
> >> tests, it costs more for the project than a compilation failure.
> >>
> >> Rega

Re: MirrorMaker 2.0 and Streams interplay (topic naming control)

2019-03-27 Thread Paul Whalen
John,

You make a good case for it already being a public API, so my nerves are
definitely eased on that front. I do think we have a path to move forward
with the user space solution, and if I get a chance, I'm going to try
proving it out with a trivial demo using an early MM2 build - but it's nice
to hear your support of the use case regardless.  The ACL concern makes a
lot of sense, and while I don't think it would be a deal breaker because of
what you say about advanced control naturally requiring extra care, I'm
generally against the added complexity of custom topic naming unless we
really need it.  It looks like MM2 will also support optional ACL
mirroring, so that should only make things easier.

Regarding the management burden of doing these switchovers: fortunately our
case is something like running in pre-prod maybe 3 consecutive days out of
the month, and just prod for the rest of the month.  So if it wasn't the
most effortless or fast process we could tolerate it.  Though if it was
easy I wouldn't be surprised if others wanted a similar workflow with much
faster iteration - spinning up a new environment with the same data as prod
is always a boon.

Thanks again!
Paul

On Wed, Mar 27, 2019 at 2:17 PM John Roesler  wrote:

> Hi Paul,
>
> Sorry for overlooking the "offset translation" MM2 feature. I'm glad
> Ryanne was able to confirm this would work.
>
> I'm just one voice, but FWIW, I think that the internal topic naming
> scheme is a public API. We document the structure of the naming
> scheme in several places. We also recommend making use of the fact
> that the applicationId is a prefix of the topic name in conjunction with
> Kafka Broker ACLs to grant access to the internal topics to the
> applications that own them.
>
> Actually, for this latter reason, I'm concerned that giving more control
> over the names of internal topics might make topic security and
> access control more difficult. Or maybe this concern is off-base, and
> folks who take advanced control over the topic name would also take
> on the responsibility to make sure their naming scheme works in
> conjunction with their broker configs.
>
> For whatever reason, I hadn't considered prefixing the application's
> id with "pre-prod.". Offhand, I think this would achieve the desired
> outcome. There may be some devil in the details, of course.
>
>
> Glad to hear, by the way, that you've already considered the problem
> of concurrent modifications to the changelogs (etc.). It sounds like
> your plan should work, although it might become a management burden
> if you start wanting to run a lot of these stream-app tests. In that case,
> you could consider mirroring the relevant topics *again* into a
> test-specific
> prefix (like "pre-prod.test-1.", up to some point. Then, you could stop
> the mirror, run the test, verify the results, and then just delete the
> whole test dataset.
>
>
> Does it seem like you have a good path forward? From what I'm
> hearing, the "user-space" approach is at least worth exploring before
> considering a new API. Of course, if it doesn't pan out for whatever
> reason,
> I'd (personally) support adding whatever features are necessary to support
> your use case.
>
> Thanks,
> -John
>
>
>
> On Mon, Mar 25, 2019 at 9:40 PM Paul Whalen  wrote:
>
> > John and Ryanne,
> >
> > Thanks for the responses! I think Ryanne's way of describing the question
> > is actually a much better summary than my long winded description: "a
> > Streams app can switch between topics with and without a cluster alias
> > prefix when you migrate between prod and pre-prod, while preserving
> state."
> >
> > To address a few of John's points...
> >
> > But, the prod app will still be running, and its changelog will still be
> > > mirrored into pre-prod when you start the pre-prod app.
> > >
> > The idea is actually to turn off the mirroring from prod to pre-prod
> during
> > this period, so the environments can operate completely independently and
> > their state can comfortably diverge during the testing period.  After the
> > testing period we'd be happy to throw away everything in pre-prod and
> start
> > mirroring again from prod with a blank slate.
> >
> > Also, the pre-prod app won't be in the same consumer group as the prod
> app,
> > > so it won't know from what offset to start processing input.
> > >
> > This is where I'm hoping the magic of MM2 will come in - at the time we
> > shut off mirroring from prod to pre-prod in order to spin of the pre-prod
> > environment

Re: MirrorMaker 2.0 and Streams interplay (topic naming control)

2019-03-25 Thread Paul Whalen
John and Ryanne,

Thanks for the responses! I think Ryanne's way of describing the question
is actually a much better summary than my long winded description: "a
Streams app can switch between topics with and without a cluster alias
prefix when you migrate between prod and pre-prod, while preserving state."

To address a few of John's points...

But, the prod app will still be running, and its changelog will still be
> mirrored into pre-prod when you start the pre-prod app.
>
The idea is actually to turn off the mirroring from prod to pre-prod during
this period, so the environments can operate completely independently and
their state can comfortably diverge during the testing period.  After the
testing period we'd be happy to throw away everything in pre-prod and start
mirroring again from prod with a blank slate.

Also, the pre-prod app won't be in the same consumer group as the prod app,
> so it won't know from what offset to start processing input.
>
This is where I'm hoping the magic of MM2 will come in - at the time we
shut off mirroring from prod to pre-prod in order to spin of the pre-prod
environment, we will do an "offset translation" with RemoteClusterUtils
like Ryanne mentioned, so new Streams apps in pre-prod will see consumer
offsets that make sense for reading from pre-prod topics.

I like both of your ideas around the "user space" solution: subscribing to
multiple topics, or choosing a topic based on config.  However, in order to
populate their internal state properly, when the pre-prod apps come up they
will need to look for repartition and changelog topics with the right
prefix.  This seems problematic to me since the user doesn't have direct
control over those topic names, though it did just occur to me now that the
user *sort of* does.  Since the naming scheme is currently just
applicationId + "-" + storeName + "-changelog", we could translate the
consumer group offsets to a consumer group with a new name that has the
same prefix as the mirrored topics do.  That seems a bit clumsly/lucky to
me (is the internal topic naming convention really a "public API"?), but I
think it would work.

I'd be curious to hear if folks think that solution would work and be an
acceptable pattern, since my original proposal of more user control of
internal topic naming did seem a bit heavy handed.

Thanks very much for your help!
Paul

On Mon, Mar 25, 2019 at 3:14 PM Ryanne Dolan  wrote:

> Hey Paul, thanks for the kind words re MM2.
>
> I'm not a Streams expert first off, but I think I understand your question:
> if a Streams app can switch between topics with and without a cluster alias
> prefix when you migrate between prod and pre-prod, while preserving state.
> Streams supports regexes and lists of topics as input, so you can use e.g.
> builder.stream(List.of("topic1", "prod.topic1")), which is a good place to
> start. In this case, the combined subscription is still a single stream,
> conceptually, but comprises partitions from both topics, i.e. partitions
> from topic1 plus partitions from prod.topic1. At a high level, this is no
> different than adding more partitions to a single topic. I think any
> intermediate or downstream topics/tables would remain unchanged, since they
> are still the result of this single stream.
>
> The trick is to correctly translate offsets for the input topics when
> migrating the app between prod and pre-prod, which RemoteClusterUtils can
> help with. You could do this with external tooling, e.g. a script
> leveraging RemoteClusterUtils and kafka-streams-application-reset.sh. I
> haven't tried this with a Streams app myself, but I suspect it would work.
>
> Ryanne
>
>
> On Sun, Mar 24, 2019 at 12:31 PM Paul Whalen  wrote:
>
> > Hi all,
> >
> > With MirrorMaker 2.0 (
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0
> > )
> > accepted and coming along very nicely in development, it has got me
> > wondering if a certain use case is supported, and if not, can changes be
> > made to Streams or MM2 to support it.  I'll explain the use case, but the
> > TL;DR here is "do we need more control over topic naming in MM2 or
> > Streams?"
> >
> > My team foresees using MM2 as a way to mirror data from our prod
> > environment to a pre-prod environment.  The data is supplied by external
> > vendors, introduced into our system through a Kafka Streams ETL pipeline,
> > and consumed by our end-applications.  Generally we would only like to
> run
> > the ETL pipeline in prod since there is an operational cost to running it
> > in both prod and pre-prod (the data sometimes needs manual attention).
> >

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-24 Thread Paul Whalen
Ivan,

Good point about the terminal operation being required.  But is that really
such a bad thing?  If the user doesn't want a defaultBranch they can call
some other terminal method (noDefaultBranch()?) just as easily.  In fact I
think it creates an opportunity for a nicer API - a user could specify a
terminal method that assumes nothing will reach the default branch,
throwing an exception if such a case occurs.  That seems like an
improvement over the current branch() API, which allows for the more subtle
behavior of records unexpectedly getting dropped.

The need for a terminal operation certainly has to be well documented, but
it would be fairly easily for the InternalTopologyBuilder to track dangling
branches that haven't been terminated and raise a clear error before it
becomes an issue.  Especially now that there is a "build step" where the
topology is actually wired up, when StreamsBuilder.build() is called.

Regarding onTopOf() returning its argument, I agree that it's critical to
allow users to do other operations on the input stream.  With the fluent
solution, it ought to work the same way all other operations do - if you
want to process off the original KStream multiple times, you just need the
stream as a variable so you can call as many operations on it as you desire.

Thoughts?

Best,
Paul

On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev  wrote:

> Hello Paul,
>
> I afraid this won't work because we do not always need the
> defaultBranch. And without a terminal operation we don't know when to
> finalize and build the 'branch switch'.
>
> In my proposal, onTopOf returns its argument, so we can do something
> more with the original branch after branching.
>
> I understand your point that the need of special object construction
> contrasts the fluency of most KStream methods. But here we have a
> special case: we build the switch to split the flow, so I think this is
> still idiomatic.
>
> Regards,
>
> Ivan
>
>
>
> 24.03.2019 4:02, Paul Whalen пишет:
> > Ivan,
> >
> > I think it's a great idea to improve this API, but I find the onTopOff()
> > mechanism a little confusing since it contrasts the fluency of other
> > KStream method calls.  Ideally I'd like to just call a method on the
> stream
> > so it still reads top to bottom if the branch cases are defined fluently.
> > I think the addBranch(predicate, handleCase) is very nice and the right
> way
> > to do things, but what if we flipped around how we specify the source
> > stream.
> >
> > Like:
> >
> > stream.branch()
> >  .addBranch(predicate1, this::handle1)
> >  .addBranch(predicate2, this::handle2)
> >  .defaultBranch(this::handleDefault);
> >
> > Where branch() returns a KBranchedStreams or KStreamBrancher or
> something,
> > which is added to by addBranch() and terminated by defaultBranch() (which
> > returns void).  This is obviously incompatible with the current API, so
> the
> > new stream.branch() would have to have a different name, but that seems
> > like a fairly small problem - we could call it something like branched()
> or
> > branchedStreams() and deprecate the old API.
> >
> > Does this satisfy the motivations of your KIP?  It seems like it does to
> > me, allowing for clear in-line branching while also allowing you to
> > dynamically build of branches off of KBranchedStreams if desired.
> >
> > Thanks,
> > Paul
> >
> >
> >
> > On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev
> 
> > wrote:
> >
> >> Hi Bill,
> >>
> >> Thank you for your reply!
> >>
> >> This is how I usually do it:
> >>
> >> void handleFirstCase(KStream ks){
> >>  ks.filter().mapValues(...)
> >> }
> >>
> >>
> >> void handleSecondCase(KStream ks){
> >>  ks.selectKey(...).groupByKey()...
> >> }
> >>
> >> ..
> >> new KafkaStreamsBrancher()
> >> .addBranch(predicate1, this::handleFirstCase)
> >> .addBranch(predicate2, this::handleSecondCase)
> >> .onTopOf()
> >>
> >> Regards,
> >>
> >> Ivan
> >>
> >> 22.03.2019 1:34, Bill Bejeck пишет:
> >>> Hi Ivan,
> >>>
> >>> Thanks for the KIP.
> >>>
> >>> I have one question, the KafkaStreamsBrancher takes a Consumer as a
> >> second
> >>> argument which returns nothing, and the example in the KIP shows each
> >>> stream from the branch using a terminal node (KafkaStreams#to() in this
>

MirrorMaker 2.0 and Streams interplay (topic naming control)

2019-03-24 Thread Paul Whalen
Hi all,

With MirrorMaker 2.0 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0)
accepted and coming along very nicely in development, it has got me
wondering if a certain use case is supported, and if not, can changes be
made to Streams or MM2 to support it.  I'll explain the use case, but the
TL;DR here is "do we need more control over topic naming in MM2 or Streams?"

My team foresees using MM2 as a way to mirror data from our prod
environment to a pre-prod environment.  The data is supplied by external
vendors, introduced into our system through a Kafka Streams ETL pipeline,
and consumed by our end-applications.  Generally we would only like to run
the ETL pipeline in prod since there is an operational cost to running it
in both prod and pre-prod (the data sometimes needs manual attention).
This seems to fit MM2 well: pre-prod end-applications consume from the
pre-prod Kafka cluster, which is entirely "remote" topics being mirrored
from the prod cluster.  We only have to keep one instance of the ETL
pipeline running, but end-applications can be separate, connecting to their
respective prod and pre-prod Kafka clusters.

However, when we want to test changes to the ETL pipeline itself, we would
like to turn off the mirroring from prod to pre-prod, and run the ETL
pipeline also in pre-prod, picking up the most recent state of the prod
pipeline from when mirroring was turned off (FWIW, downtime is not an issue
for our use case).

My question/concern is basically, can Streams apps work when they're
running against topics prepended with a cluster alias, like
"pre-prod.App-statestore-changelog" as is the plan with MM2. From what I
can tell the answer is no, and my proposal would be to give the Streams
user more specific control over how Streams names its internal topics
(repartition and changelogs) by defining an "InternalTopicNamingStrategy"
or similar.  Perhaps there is a solution on the MM2 side as well, but it
seems much less desirable to budge on that convention.

I phrased the question in terms of my team's problem, but it's worth noting
that this use case is passably similar to a potential DR use case, where
there is a DR cluster that is normally just being mirrored to by MM2, but
in a DR scenario would become the active cluster that Streams applications
are connected to.

Thanks for considering this issue, and great job to those working on MM2 so
far!

Paul


Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-23 Thread Paul Whalen
Ivan,

I think it's a great idea to improve this API, but I find the onTopOff()
mechanism a little confusing since it contrasts the fluency of other
KStream method calls.  Ideally I'd like to just call a method on the stream
so it still reads top to bottom if the branch cases are defined fluently.
I think the addBranch(predicate, handleCase) is very nice and the right way
to do things, but what if we flipped around how we specify the source
stream.

Like:

stream.branch()
.addBranch(predicate1, this::handle1)
.addBranch(predicate2, this::handle2)
.defaultBranch(this::handleDefault);

Where branch() returns a KBranchedStreams or KStreamBrancher or something,
which is added to by addBranch() and terminated by defaultBranch() (which
returns void).  This is obviously incompatible with the current API, so the
new stream.branch() would have to have a different name, but that seems
like a fairly small problem - we could call it something like branched() or
branchedStreams() and deprecate the old API.

Does this satisfy the motivations of your KIP?  It seems like it does to
me, allowing for clear in-line branching while also allowing you to
dynamically build of branches off of KBranchedStreams if desired.

Thanks,
Paul



On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev 
wrote:

> Hi Bill,
>
> Thank you for your reply!
>
> This is how I usually do it:
>
> void handleFirstCase(KStream ks){
> ks.filter().mapValues(...)
> }
>
>
> void handleSecondCase(KStream ks){
> ks.selectKey(...).groupByKey()...
> }
>
> ..
> new KafkaStreamsBrancher()
>.addBranch(predicate1, this::handleFirstCase)
>.addBranch(predicate2, this::handleSecondCase)
>.onTopOf()
>
> Regards,
>
> Ivan
>
> 22.03.2019 1:34, Bill Bejeck пишет:
> > Hi Ivan,
> >
> > Thanks for the KIP.
> >
> > I have one question, the KafkaStreamsBrancher takes a Consumer as a
> second
> > argument which returns nothing, and the example in the KIP shows each
> > stream from the branch using a terminal node (KafkaStreams#to() in this
> > case).
> >
> > Maybe I've missed something, but how would we handle the case where the
> > user has created a branch but wants to continue processing and not
> > necessarily use a terminal node on the branched stream immediately?
> >
> > For example, using today's logic as is if we had something like this:
> >
> > KStream[] branches = originalStream.branch(predicate1,
> > predicate2);
> > branches[0].filter().mapValues(...)..
> > branches[1].selectKey(...).groupByKey().
> >
> >
> > Thanks!
> > Bill
> >
> >
> >
> > On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck  wrote:
> >
> >> All,
> >>
> >> I'd like to jump-start the discussion for KIP- 418.
> >>
> >> Here's the original message:
> >>
> >> Hello,
> >>
> >> I'd like to start a discussion about KIP-418. Please take a look at the
> >> KIP if you can, I would appreciate any feedback :)
> >>
> >> KIP-418:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
> >>
> >> JIRA KAFKA-5488: https://issues.apache.org/jira/browse/KAFKA-5488
> >>
> >> PR#6164: https://github.com/apache/kafka/pull/6164
> >>
> >> Regards,
> >>
> >> Ivan Ponomarev
> >>
> >>
> >
>
>


Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-03-23 Thread Paul Whalen
I'd like to resurrect this discussion with a cursory, proof-of-concept
implementation of the KIP which combines many of our ideas:
https://github.com/apache/kafka/pull/6496.  I tried to keep the diff as
small as possible for now, just using it to convey the main ideas.  But
I'll separately address some of our earlier discussion:

   - Will there be a new, separate interface for users to implement for the
   new functionality? No, to hopefully keep things simple, all of the
   Processor/TransformerSupplier interfaces will just extend
   StateStoresSupplier, allowing users to opt in to this functionality by
   overriding the default implementation that gives an empty list.
   - Will the interface allow users to specify the store name, or the
   entire StoreBuilder? The entire StoreBuilder, so the
   Processor/TransformerSupplier can completely encapsulate name and
   implementation of a state store if desired.
   - Will the old way of specifying store names alongside the supplier when
   calling stream.process/transform() be deprecated? No, this is still a
   legitimate way to wire up Processors/Transformers and their stores. But I
   would recommend not allowing stream.process/transform() calls that use both
   store declaration mechanisms (this restriction is not in the proof of
   concept)
   - How will we handle adding the same state store to the topology
   multiple times because different Processor/TransformerSuppliers declare it?
   topology.addStateStore() will be slightly relaxed for convenience, and will
   allow adding the same StoreBuilder multiple times as long as the exact same
   StoreBuilder instance is being added for the same store name.  This seems
   to prevent in practice the issue of accidentally making two state stores
   one by adding with the same name.  For additional safety, if we wanted to
   (not in the proof of concept), we could allow for this relaxation only for
   internal callers of topology.addStateStore().

So, in summary, the use cases look like:

   - 1 transformer/processor that owns its store: Using the new
   StateStoresSupplier interface method to supply its StoreBuilders that will
   be added to the topology automatically.
   - Multiple transformer/processors that share the same store: Either


   1. The old way: the StoreBuilder is defined "far away" from the
   Transformer/Processor implementations, and is added to the topology
   manually by the user
   2. The new way: the StoreBuilder is defined closer to the
   Transformer/Processor implementations, and the same instance is returned by
   all Transformer/ProcessorSuppliers that need it


This makes the KIP wiki a bit stale; I'll update if we want to bring this
design to a vote.

Thanks!
Paul

On Sun, Dec 16, 2018 at 6:04 PM Guozhang Wang  wrote:

> Matthias / Paul,
>
> The concern I had about introducing `StoreBuilderSupplier` is simply
> because it is another XXSupplier to the public API, so I'd like to ask if
> we really have to add it :)
>
> The difference between encapsulating the store name and encapsulating the
> full state store builder is that, in the former:
>
> ---
>
> String storeName = "store1";
> builder.addStore(new MyStoreBuilder(storeName));
> stream1.transform(new MyTransformerSupplier(storeName));   // following my
> proposal, that the store name can be passed in and used for both
> `listStores` and in the `Transformer#init`; so the Transformer function
> does not need to get the constant string name again.
>
>  // one caveat to admit, is that
> MyTransofmerSupplier logic may be just unique to `store1` so it cannot be
> reused with a different store name anyways.
> ---
>
> While in the latter:
>
> ---
>
> stream1.transform(new MyTransformerSupplierForStore1);   // the name is
> just indicating that we may have one such supplier for each store.
>
> ---
>
> I understand the latter introduce more convenience from the API, but the
> cost is that since we still cannot completely `builder.addStore`, but only
> reduce its semantic scope to shared state stores only,; hence users need to
> learn two ways of creating state stores for those two patterns.
>
> My argument is that more public APIs requires longer learning curve for
> users, and introduces more usage patterns that may confuse users (the
> proposal I had tries to replace one with another completely).
>
>
> Guozhang
>
> On Sun, Dec 16, 2018 at 2:58 PM Paul Whalen  wrote:
>
> > Thanks for the great thoughts Matthias and Guozhang!
> >
> > If I'm not mistaken, Guozhang's suggestion is what my second alternative
> on
> > the KIP is ("Have the added method on the Supplier interfaces only return
> > store names, not builders").  I do think it would be a worthw

[jira] [Created] (KAFKA-7941) Connect KafkaBasedLog work thread terminates when getting offsets fails because broker is unavailable

2019-02-17 Thread Paul Whalen (JIRA)
Paul Whalen created KAFKA-7941:
--

 Summary: Connect KafkaBasedLog work thread terminates when getting 
offsets fails because broker is unavailable
 Key: KAFKA-7941
 URL: https://issues.apache.org/jira/browse/KAFKA-7941
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.0.0
Reporter: Paul Whalen
Assignee: Paul Whalen


My team has run into this Connect bug regularly in the last six months while 
doing infrastructure maintenance that causes intermittent broker availability 
issues.  I'm a little surprised it exists given how routinely it affects us, so 
perhaps someone in the know can point out if our setup is somehow just 
incorrect.  My team is running 2.0.0 on both the broker and client, though from 
what I can tell from reading the code, the issue continues to exist through 
2.2; at least, I was able to write a failing unit test that I believe 
reproduces it.

When a {{KafkaBasedLog}} worker thread in the Connect runtime calls 
{{readLogToEnd}} and brokers are unavailable, the {{TimeoutException}} from the 
consumer {{endOffsets}} call is uncaught all the way up to the top level 
{{catch (Throwable t)}}, effectively killing the thread until restarting 
Connect.  The result is Connect stops functioning entirely, with no indication 
except for that log line - tasks still show as running.

The proposed fix is to simply catch and log the {{TimeoutException}}, allowing 
the worker thread to retry forever.

Alternatively, perhaps there is not an expectation that Connect should be able 
to recover following broker unavailability, though that would be disappointing. 
 I would at least hope hope for a louder failure then the single {{ERROR}} log.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] - KIP-213 (new vote) - Simplified and revised.

2019-01-13 Thread Paul Whalen
+1 non binding.  I haven't contributed at all to discussion but have
followed since Adam reinvigorated it a few months ago and am very excited
about it.  It would be a huge help on the project I'm working on.

On Fri, Jan 11, 2019 at 9:05 AM Adam Bellemare 
wrote:

> Thanks all -
>
> So far that's +2 Binding, +2 non-binding
>
> If we get a few more votes I can likely get this out as part of the Kafka
> 2.2 release, as the KIP Freeze is Jan 24, 2019. The current PR I have could
> be modified to match the PR in short order.
>
> Adam
>
>
> On Fri, Jan 11, 2019 at 7:11 AM Damian Guy  wrote:
>
> > +1 binding
> >
> > On Thu, 10 Jan 2019 at 16:57, Bill Bejeck  wrote:
> >
> > > +1 from me.  Great job on the KIP.
> > >
> > > -Bill
> > >
> > > On Thu, Jan 10, 2019 at 11:35 AM John Roesler 
> wrote:
> > >
> > > > It's a +1 (nonbinding) from me as well.
> > > >
> > > > Thanks for sticking with this, Adam!
> > > > -John
> > > >
> > > > On Wed, Jan 9, 2019 at 6:22 PM Guozhang Wang 
> > wrote:
> > > >
> > > > > Hello Adam,
> > > > >
> > > > > I'm +1 on the current proposal, thanks!
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Mon, Jan 7, 2019 at 6:13 AM Adam Bellemare <
> > > adam.bellem...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi All
> > > > > >
> > > > > > I would like to call a new vote on KIP-213. The design has
> changed
> > > > > > substantially. Perhaps more importantly, the KIP and associated
> > > > > > documentation has been greatly simplified. I know this KIP has
> been
> > > on
> > > > > the
> > > > > > mailing list for a long time, but the help from John Roesler and
> > > > Guozhang
> > > > > > Wang have helped put it into a much better state. I would
> > appreciate
> > > > any
> > > > > > feedback or votes.
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable
> > > > > >
> > > > > >
> > > > > >
> > > > > > Thank you
> > > > > >
> > > > > > Adam Bellemare
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2018-12-16 Thread Paul Whalen
ections.singletonList(storeName);
> > <https://cwiki.apache.org/confluence/pages/Collections.emptyList();>
> > }
> > }
> >
> > Basically, we move the parameters from the caller of `transform` to
> inside
> > the TransformSuppliers. DSL implementations would not change much, simply
> > calling `connectStateStore` by getting the list of names from the
> provided
> > function.
> >
> > Guozhang
> >
> >
> > On Thu, Dec 13, 2018 at 7:27 AM Matthias J. Sax 
> > wrote:
> >
> >> Just a meta comment: do we really need to deprecate existing
> >> `transform()` etc methods?
> >>
> >> The last argument is a vararg, and thus, just keeping the existing API
> >> for this part seems to work too, allowing to implement both patterns?
> >>
> >> Also, instead of adding a default method, we could also add a new
> >> interface `StoreBuilderSupplier` with method `List
> >> stateStores()` -- users could implement `TransformerSupplier` and
> >> `StoreBuilderSupplier` at once; and for this case, we require that users
> >> don't provide store name in `transform()`.
> >>
> >> Similar, we could add an interface `StoreNameSupplier` with method
> >> `List stateStores()`. This allows to "auto-wire" a transformer
> >> to existing stores (to avoid the issue to add the same store multiple
> >> times).
> >>
> >> Hence, for shared stores, there would be one "main" transformer that
> >> implements `StoreBuilderSupplier` and that must be added first to the
> >> topology. The other transformers would implement `StoreNameSupplier` and
> >> just connect to those stores.
> >>
> >> Another possibility to avoid the issue of adding the same stores
> >> multiple times would be, that the DSL always calls `addStateStore()` but
> >> catches a potential "store exists already" exception and falls back to
> >> `connectProcessorAndStateStore()` for this case. Thus, we would not need
> >> the `StoreNameSupplier` interface and the order in which transformers
> >> are added would not matter either. The only disadvantage I see, might be
> >> potential bugs about sharing state if two different stores are named the
> >> same by mistake (this would not be detected).
> >>
> >>
> >>
> >> Just some ideas I wanted to share. What do you think?
> >>
> >>
> >>
> >> -Matthias
> >>
> >> On 12/11/18 3:46 AM, Paul Whalen wrote:
> >>> Ah yes of course, this was an oversight, I completely ignored the
> >> multiple
> >>> processors sharing the same state store when writing up the KIP.  Which
> >> is
> >>> funny, because I've actually done this (different processors sharing
> >> state
> >>> stores) a fair amount myself, and I've settled on a pattern where I
> group
> >>> the Processors in an enclosing class, and that enclosing class handles
> as
> >>> much as possible.  Here's a gist showing the rough structure, just for
> >>> context:
> >> https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65
> >>> . Note how it adds the stores to the topology, as well as providing a
> >>> public method with the store names.
> >>>
> >>> I don't think my proposal completely conflicts with the multiple
> >> processors
> >>> sharing state stores use case, since you can create a supplier that
> >>> provides the store name you want, somewhat independently of your actual
> >>> Processor logic.  The issue I do see though, is that
> >>> topology.addStateStore() can only be called once for a given store.  So
> >> for
> >>> your example, if the there was a single TransformerSupplier that was
> >> passed
> >>> into both transform() calls, "store1" would be added (under the hood)
> to
> >>> the topology twice, which is no good.
> >>>
> >>> Perhaps this suggests that one of my alternatives on the KIP might be
> >>> desirable: either not having the suppliers return StoreBuilders (just
> >> store
> >>> names), or not deprecating the old methods that take "String...
> >>> stateStoreNames". I'll have to think about it a bit.
> >>>
> >>> Paul
> >>>
> >>> On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang 
> >> wrote:
> >>>
> >>>> Hello Paul

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2018-12-10 Thread Paul Whalen
Ah yes of course, this was an oversight, I completely ignored the multiple
processors sharing the same state store when writing up the KIP.  Which is
funny, because I've actually done this (different processors sharing state
stores) a fair amount myself, and I've settled on a pattern where I group
the Processors in an enclosing class, and that enclosing class handles as
much as possible.  Here's a gist showing the rough structure, just for
context: https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65
. Note how it adds the stores to the topology, as well as providing a
public method with the store names.

I don't think my proposal completely conflicts with the multiple processors
sharing state stores use case, since you can create a supplier that
provides the store name you want, somewhat independently of your actual
Processor logic.  The issue I do see though, is that
topology.addStateStore() can only be called once for a given store.  So for
your example, if the there was a single TransformerSupplier that was passed
into both transform() calls, "store1" would be added (under the hood) to
the topology twice, which is no good.

Perhaps this suggests that one of my alternatives on the KIP might be
desirable: either not having the suppliers return StoreBuilders (just store
names), or not deprecating the old methods that take "String...
stateStoreNames". I'll have to think about it a bit.

Paul

On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang  wrote:

> Hello Paul,
>
> Thanks for the great writeup (very detailed and crystal motivation
> sections!).
>
> This is quite an interesting idea and I do like the API cleanness you
> proposed. The original motivation of letting StreamsTopology to add state
> stores though, is to allow different processors to share the state store.
> For example:
>
> builder.addStore("store1");
>
> // a path of stream transformations that leads to KStream stream1.
> stream1.transform(..., "store1");
>
> // another path that generates a KStream stream2.
> stream2.transform(..., "store1");
>
> Behind the scene, Streams will make sure stream1 / stream2 transformations
> will always be grouped together as a single group of tasks, each of which
> will be executed by a single thread and hence there's no concurrency issues
> on accessing the store from different operators within the same task. I'm
> not sure how common this use case is, but I'd like to hear if you have any
> thoughts maintaining this since the current proposal seems exclude this
> possibility.
>
>
> Guozhang
>
>
> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen  wrote:
>
> > Here's KIP-401 for discussion, a minor Kafka Streams API change that I
> > think could greatly increase the usability of the low-level processor
> API.
> > I have some code written but will wait to see if there is buy in before
> > going all out and creating a pull request.  It seems like most of the
> work
> > would be in updating documentation and tests.
> >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756
> >
> > Thanks!
> > Paul
> >
>
>
> --
> -- Guozhang
>


[DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2018-12-09 Thread Paul Whalen
Here's KIP-401 for discussion, a minor Kafka Streams API change that I
think could greatly increase the usability of the low-level processor API.
I have some code written but will wait to see if there is buy in before
going all out and creating a pull request.  It seems like most of the work
would be in updating documentation and tests.

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756

Thanks!
Paul


KIP creation permission

2018-12-07 Thread Paul Whalen
Based on some other messages the mailing list seems like this is the best
place to ask for permissions. Can it be granted on the wiki for pgwhalen?
Gonna try to write something up for this:
https://issues.apache.org/jira/browse/KAFKA-7523

Thanks!
Paul


[jira] [Created] (KAFKA-7523) TransformerSupplier/ProcessorSupplier enhancements

2018-10-19 Thread Paul Whalen (JIRA)
Paul Whalen created KAFKA-7523:
--

 Summary: TransformerSupplier/ProcessorSupplier enhancements
 Key: KAFKA-7523
 URL: https://issues.apache.org/jira/browse/KAFKA-7523
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Paul Whalen


I have found that when writing "low level" {{Processors}} and {{Transformers}} 
that are stateful, often I want these processors to "own" one or more state 
stores, the details of which are not important to the business logic of the 
application.  However, when incorporating these into the topologies defined by 
the high level API, using {{KStream::transform}} or {{KStream::process}}, I'm 
forced to specify the stores so the topology is wired up correctly.  This 
creates an unfortunate pattern where the {{TransformerSupplier}} or 
{{ProcessorSupplier,}} who (according to the pattern I've been following) holds 
the information about the name of the state stores, must be defined above the 
"high level" "fluent API"-style pipeline, which makes it hard to understand the 
business logic data flow.

 

What I currently have to do:
{code:java}
TransformerSupplier transformerSupplier = new 
TransformerSupplierWithState(topology, val -> businessLogic(val));
builder.stream("in.topic")
.transform(transformerSupplier, transformerSupplier.stateStoreNames())
.to("out.topic");{code}
I have to both define the {{TransformerSupplier}} above the "fluent block", and 
pass the topology in so I can call {{topology.addStateStore()}} inside the 
{{TransformerSupplier}} constructor and tell the {{StreamsBuilder}} what the 
state store names are for that point in the topology. The lambda {{val -> 
businessLogic(val)}} is really what I want to see in-line because that's the 
crux of what is happening, along with the name of some factory method 
describing what the transformer is doing for me internally. This issue is 
obviously exacerbated when the "fluent block" is much longer than this example 
- It gets worse the farther away {{val -> businessLogic(val)}} is from 
{{KStream::transform}}.

 
An improvement:
{code:java}
builder.stream("in.topic")
.transform(transformerSupplierWithState(topology, val -> 
businessLogic(val)))
.to("out.topic");{code}
Which implies the existence of a {{KStream::transform}} that takes a single 
argument that adheres to this interface:
{code:java}
interface TransformerSupplierWithState {
Transformer get();
String[] stateStoreNames();
}{code}
Or better yet, I wouldn't have to pass in the topology, the caller of 
{{TransformerSupplierWithState}} could also handle the job of "adding" its 
state stores to the topology:
{code:java}
interface TransformerSupplierWithState {
Transformer get();
Map stateStores();
}{code}
Which would enable my ideal:
{code:java}
builder.stream("in.topic")
.transform(transformerSupplierWithState(val -> businessLogic(val)))
.to("out.topic");{code}
I think this would be a huge improvement in the usability of low-level 
processors with the high-level DSL.

Please let me know if I'm missing something as to why this cannot or should not 
happen, or if there is a better forum for this suggestion (presumably it would 
require a KIP?). I'd be happy to build it as well if there is a chance of it 
being merged, it doesn't seem like a huge challenge to me.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)