Hello John,

Thanks for your detailed explanation, I've done some quick checks on some
existing examples that heavily used Processor and the results also makes me
worried about my previous statements that "the breakage would not be big".
I agree we should maintain compatibility.

About the naming itself, I'm actually a bit inclined into sub-packages than
renamed new classes, and my motivations are that our current packaging is
already quite coarsen grained and sometimes ill-placed, and hence maybe we
can take this change along with some clean up on packages (but again, we
should follow the deprecate - removal path). What I'm thinking is:

-------------------

processor/: StateRestoreCallback/AbstractNotifyingRestoreCallback, (deprecated
later, same meaning for other cross-throughs), ProcessContest,
RecordContext, Punctuator, PunctuationType, To, Cancellable (are the only
things left)

(new) processor/api/: Processor, ProcessorSupplier (and of course, these
two classes can be strong typed)

state/: StateStore, BatchingStateRestoreCallback,
AbstractNotifyingBatchingRestoreCallback (moved from processor/),
PartitionGrouper, WindowStoreIterator, StateSerdes (this one can be moved
into state/internals), TimestampedByteStore (we can move this to internals
since store types would use vat by default, see below), ValueAndTimestamp

(new) state/factory/: Stores, StoreBuilder, StoreSupplier; *BUT* the new
Stores would not have timestampedXXBuilder APIs since the default
StoreSupplier / StoreBuilder value types are ValueAndTimestamp already.

(new) state/queryable/: QueryableStoreType, QueryableStoreTypes, HostInfo

(new) state/keyValue/: KeyValueXXX classes, and also the same for
state/sessionWindow and state/timeWindow; *BUT* here we use
ValueAndTimestamp as value types of those APIs directly, and also
TimestampedKeyValue/WindowStore would be deprecated.

(new) kstream/api/: KStream, KTable, GroupedKStream (renamed from
KGroupedStream), GroupedKTable (renamed from KGroupedTable),
TimeWindowedKStream, SessionWindowedKStream, GlobalKTable

(new) kstream/operator/: Aggregator, ForeachFunction,  ... , Merger and
Grouped, Joined, Materialized, ... , Printed and Transformer,
TransformerSupplier.

(new) kstream/window/: Window, Windows, Windowed, TimeWindows,
SessionWindows, UnlimitedWindows, JoinWindows, WindowedSerdes,
Time/SessionWindowedSerialized/Deserializer.

(new) configure/: RocksDBConfigSetter, TopicNameExtractor,
TimestampExtractor, UsePreviousTimeOnInvalidTimestamp,
WallclockTimestampExtractor, ExtractRecordMetadataTimestamp,
FailOnInvalidTimestamp, LogAndSkipOnInvalidTimestamp, StateRestoreListener,

(new) metadata/: StreamsMetadata, ThreadMetadata, TaskMetadata, TaskId

Still, any xxx/internals packages are declared as inner classes, but other
xxx/yyy packages are declared as public APIs.

-------------------

This is a very wild thought and I can totally understand if people feel
this is too much since it definitely enlarges the scope of this KIP a lot
:) just trying to play a devil's advocate here to do major refactoring and
avoid renaming Processor classes.


Guozhang


On Fri, Jun 21, 2019 at 9:51 PM Matthias J. Sax <matth...@confluent.io>
wrote:
>
> I think `RecordProcessor` is a good name.
>
>
> -Matthias
>
> On 6/21/19 5:09 PM, John Roesler wrote:
> > After kicking the naming around a bit more, it seems like any package
> > name change is a bit "weird" because it fragments the package and
> > directory structure. If we can come up with a reasonable name for the
> > interface after all, it seems like the better choice.
> >
> > The real challenge is that the existing name "Processor" seems just
> > about perfect. In picking a new name, we need to consider the ultimate
> > state, after the deprecation period, when we entirely remove
> > Processor. In this context, TypedProcessor seems a little odd to me,
> > because it seems to imply that there should also be an "untyped
> > processor".
> >
> > After kicking around a few other ideas, what does everyone think about
> > "RecordProcessor"? I _think_ maybe it stands on its own just fine,
> > because it's a thing that processes... records?
> >
> > If others agree with this, I can change the proposal to RecordProcessor.
> >
> > Thanks,
> > -John
> >
> > On Fri, Jun 21, 2019 at 6:42 PM John Roesler <j...@confluent.io> wrote:
> >>
> >> Hi all,
> >>
> >> I've updated the KIP with the feedback so far.
> >>
> >> The naming question is still the biggest (only?) outstanding issue. It
> >> would be good to hear some more thoughts on it.
> >>
> >> As we stand now, there's one vote for changing the package name to
> >> something like 'typedprocessor', one for changing the interface to
> >> TypedProcessor (as in the PoC), and one for just changing the
> >> Processor interface in-place, breaking source compatibility.
> >>
> >> How can we resolve this decision?
> >>
> >> Thanks,
> >> -John
> >>
> >> On Thu, Jun 20, 2019 at 5:44 PM John Roesler <j...@confluent.io> wrote:
> >>>
> >>> Thanks for the feedback, Guozhang and Matthias,
> >>>
> >>> Regarding motivation: I'll update the wiki. Briefly:
> >>> * Any processor can benefit. Imagine a pure user of the ProcessorAPI
> >>> who has very complex processing logic. I have seen several processor
> >>> implementation that are hundreds of lines long and call
> >>> `context.forward` in many different locations and branches. In such an
> >>> implementation, it would be very easy to have a bug in a rarely used
> >>> branch that forwards the wrong kind of value. This would structurally
> >>> prevent that from happening.
> >>> * Also, anyone who heavily uses the ProcessorAPI would likely have
> >>> developed helper methods to wire together processors, just as we have
> >>> in the DSL implementation. This change would enable them to ensure at
> >>> compile time that they are actually wiring together compatible types.
> >>> This was actually _my_ original motivation, since I found it very
> >>> difficult and time consuming to follow the Streams DSL internal
> >>> builders.
> >>>
> >>> Regarding breaking the source compatibility of Processor: I would
> >>> _love_ to side-step the naming problem, but I really don't know if
> >>> it's excusable to break compatibility. I suspect that our oldest and
> >>> dearest friends are using the ProcessorAPI in some form or another,
> >>> and all their source code would break. It sucks to have to create a
> >>> whole new interface to get around this, but it feels like the right
> >>> thing to do. Would be nice to get even more feedback on this point,
> >>> though.
> >>>
> >>> Regarding the types of stores, as I said in my response to Sophie,
> >>> it's not an issue.
> >>>
> >>> Regarding the change to StreamsBuilder, it doesn't pin the types in
> >>> any way, since all the types are bounded by Object only, and there are
> >>> no extra constraints between arguments (each type is used only once in
> >>> one argument). But maybe I missed the point you were asking about.
> >>> Since the type takes generic paramters, we should allow users to pass
> >>> in parameterized arguments. Otherwise, they would _have to_ give us a
> >>> raw type, and they would be forced to get a "rawtyes" warning from the
> >>> compiler. So, it's our obligation in any API that accepts a
> >>> parameterized-type parameter to allow people to actually pass a
> >>> parameterized type, even if we don't actually use the parameters.
> >>>
> >>> The naming question is a complex one, as I took pains to detail
> >>> previously. Please don't just pick out one minor point, call it weak,
> >>> and then claim that it invalidates the whole decision. I don't think
> >>> there's a clear best choice, so I'm more than happy for someone to
> >>> advocate for renaming the class instead of the package. Can you
> >>> provide some reasons why you think that would be better?
> >>>
> >>> Regarding the deprecated methods, you're absolutely right. I'll
update the KIP.
> >>>
> >>> Thanks again for all the feedback!
> >>> -John
> >>>
> >>> On Thu, Jun 20, 2019 at 4:34 PM Matthias J. Sax <matth...@confluent.io>
wrote:
> >>>>
> >>>> Just want to second what Sophie said about the stores. The type of a
> >>>> used stores is completely independent of input/output types.
> >>>>
> >>>> This related to change `addGlobalStore()` method. Why do you want to
pin
> >>>> the types? In fact, people request the ability to filter() and maybe
> >>>> even map() the data before they are put into the global store.
Limiting
> >>>> the types seems to be a step backward here?
> >>>>
> >>>>
> >>>>
> >>>> Also, the pack name is questionable.
> >>>>
> >>>>> This wouldn't be the first project to do something like this...
> >>>>
> >>>> Not a strong argument. I would actually propose to not a a new
package,
> >>>> but just a new class `TypedProcessor`.
> >>>>
> >>>>
> >>>> For `ProcessorContext#forward` methods -- some of those methods are
> >>>> already deprecated. While the will still be affected, it would be
worth
> >>>> to mark them as deprecated in the wiki page, too.
> >>>>
> >>>>
> >>>> @Guozhang: I dont' think we should break source compatibility in a
minor
> >>>> release.
> >>>>
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>>
> >>>> On 6/20/19 1:43 PM, Guozhang Wang wrote:
> >>>>> Hi John,
> >>>>>
> >>>>> Thanks for KIP! I've a few comments below:
> >>>>>
> >>>>> 1. So far the "Motivation" section is very general, and the only
concrete
> >>>>> example that I have in mind is `TransformValues#punctuate`. Do we
have any
> >>>>> other concrete issues that drive this KIP? If not then I feel
better to
> >>>>> narrow the scope of this KIP to:
> >>>>>
> >>>>> 1.a) modifying ProcessorContext only with the output types on
forward.
> >>>>> 1.b) modifying Transformer signature to have generics of
ProcessorContext,
> >>>>> and then lift the restricting of not using punctuate: if user did
not
> >>>>> follow the enforced typing and just code without generics, they
will get
> >>>>> warning at compile time and get run-time error if they forward
wrong-typed
> >>>>> records, which I think would be acceptable.
> >>>>>
> >>>>> I feel this would be a good solution for this specific issue;
again, feel
> >>>>> free to update the wiki page with other known issues that cannot be
> >>>>> resolved.
> >>>>>
> >>>>> 2. If, we want to go with the current scope then my next question
would be,
> >>>>> how much breakage we would introducing if we just modify the
Processor
> >>>>> signature directly? My feeling is that DSL users would be most
likely not
> >>>>> affected and PAPI users only need to modify a few lines on class
> >>>>> declaration. I feel it worth doing some research on this part and
then
> >>>>> decide if we really want to bite the bullet of duplicated Processor
/
> >>>>> ProcessorSupplier classes for maintaining compatibility.
> >>>>>
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Wed, Jun 19, 2019 at 12:21 PM John Roesler <j...@confluent.io>
wrote:
> >>>>>
> >>>>>> Hi all,
> >>>>>>
> >>>>>> In response to the feedback so far, I changed the package name from
> >>>>>> `processor2` to `processor.generic`.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> -John
> >>>>>>
> >>>>>> On Mon, Jun 17, 2019 at 4:49 PM John Roesler <j...@confluent.io>
wrote:
> >>>>>>>
> >>>>>>> Thanks for the feedback, Sophie!
> >>>>>>>
> >>>>>>> I actually felt a little uneasy when I wrote that remark, because
it's
> >>>>>>> not restricted at all in the API, it's just available to you if
you
> >>>>>>> choose to give your stores and context the same parameters. So, I
> >>>>>>> think your use case is valid, and also perfectly permissable
under the
> >>>>>>> current KIP. Sorry for sowing confusion on my own discussion
thread!
> >>>>>>>
> >>>>>>> I'm not crazy about the package name, either. I went with it only
> >>>>>>> because there's seemingly nothing special about the new package
except
> >>>>>>> that it can't have the same name as the old one. Otherwise, the
> >>>>>>> existing "processor" and "Processor" names for the package and
class
> >>>>>>> are perfectly satisfying. Rather than pile on additional
semantics, it
> >>>>>>> seemed cleaner to just add a number to the package name.
> >>>>>>>
> >>>>>>> This wouldn't be the first project to do something like this...
Apache
> >>>>>>> Commons, for example, has added a "2" to the end of some of their
> >>>>>>> packages for exactly the same reason.
> >>>>>>>
> >>>>>>> I'm open to any suggestions. For example, we could do something
like
> >>>>>>> org.apache.kafka.streams.typedprocessor.Processor or
> >>>>>>> org.apache.kafka.streams.processor.typed.Processor , which would
have
> >>>>>>> just about the same effect. One microscopic thought is that, if
> >>>>>>> there's another interface in the "processor" package that we wish
to
> >>>>>>> do the same thing to, would _could_ pile it in to "processor2",
but we
> >>>>>>> couldn't do the same if we use a package that has "typed" in the
name,
> >>>>>>> unless that change is _also_ related to types in some way. But
this
> >>>>>>> seems like a very minor concern.
> >>>>>>>
> >>>>>>> What's your preference?
> >>>>>>> -John
> >>>>>>>
> >>>>>>> On Mon, Jun 17, 2019 at 3:56 PM Sophie Blee-Goldman <
sop...@confluent.io>
> >>>>>> wrote:
> >>>>>>>>
> >>>>>>>> Hey John, thanks for writing this up! I like the proposal but
there's
> >>>>>> one
> >>>>>>>> point that I think may be too restrictive:
> >>>>>>>>
> >>>>>>>> "A processor that happens to use a typed store is actually
emitting the
> >>>>>>>> same types that it is storing."
> >>>>>>>>
> >>>>>>>> I can imagine someone could want to leverage this new type safety
> >>>>>> without
> >>>>>>>> also limiting how they can interact with/use their store. As an
> >>>>>> (admittedly
> >>>>>>>> contrived) example, say you have an input stream of purchases of
a
> >>>>>> certain
> >>>>>>>> type (entertainment, food, etc), and on seeing a new record you
want to
> >>>>>>>> output how many types of purchase a shopper has made more than 5
> >>>>>> purchases
> >>>>>>>> of in the last month. Your state store will probably be holding
some
> >>>>>> more
> >>>>>>>> complicated PurchaseHistory object (keyed by user), but your
output is
> >>>>>> just
> >>>>>>>> a <User, Long>
> >>>>>>>>
> >>>>>>>> I'm also not crazy about "processor2" as the package name ...
not sure
> >>>>>> what
> >>>>>>>> a better one would be though (something with "typed"?)
> >>>>>>>>
> >>>>>>>> On Mon, Jun 17, 2019 at 12:47 PM John Roesler <j...@confluent.io>
> >>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi all,
> >>>>>>>>>
> >>>>>>>>> I'd like to propose KIP-478 (
> >>>>>> https://cwiki.apache.org/confluence/x/2SkLBw
> >>>>>>>>> ).
> >>>>>>>>>
> >>>>>>>>> This proposal would add output type bounds to the Processor
interface
> >>>>>>>>> in Kafka Streams, which enables static checking of a number of
useful
> >>>>>>>>> properties:
> >>>>>>>>> * A processor B that consumes the output of processor A is
actually
> >>>>>>>>> expecting the same types that processor A produces.
> >>>>>>>>> * A processor that happens to use a typed store is actually
emitting
> >>>>>>>>> the same types that it is storing.
> >>>>>>>>> * A processor is simply forwarding the expected types in all
code
> >>>>>> paths.
> >>>>>>>>> * Processors added via the Streams DSL, which are not permitted
to
> >>>>>>>>> forward results at all are statically prevented from doing so
by the
> >>>>>>>>> compiler
> >>>>>>>>>
> >>>>>>>>> Internally, we can use the above properties to achieve a much
higher
> >>>>>>>>> level of confidence in the Streams DSL implementation's
correctness.
> >>>>>>>>> Actually, while doing the POC, I found a few bugs and mistakes,
which
> >>>>>>>>> become structurally impossible with KIP-478.
> >>>>>>>>>
> >>>>>>>>> Additionally, the stronger types dramatically improve the
> >>>>>>>>> self-documentation of our Streams internal implementations,
which
> >>>>>>>>> makes it much easier for new contributors to ramp up with
confidence.
> >>>>>>>>>
> >>>>>>>>> Thanks so much for your consideration!
> >>>>>>>>> -John
> >>>>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
>


--
-- Guozhang

Reply via email to