Github user arunmahadevan commented on the issue:
https://github.com/apache/storm/pull/1693
@revans2 thanks for taking time to do detailed and thoughtful review.
> I have looked through the proposal, and I have a number of concerns about
the API. I mostly want to be sure that we can consistent terminology about what
different operations are intended to do. groupby is the big one for me, because
it seems to be a routing function and not an actual groupby, but join also
seemed a little off.
groupby is doing the fields grouping on key. I will rename it. May be it
need NOT be exposed at all since reduce/aggregate etc can do implicit
partitioning on the key. I didn't do implicit partitioning in aggregate since
trident had a concept of partitionAggregate (keyed aggregate per partition) and
thought it may be useful. May be its not needed.
Will add a groupByKey() api which returns an `Iterable<V>` similar to
beam/spark.
> Why do we need so many lines of code to do a repartition/groupby in the
word count example?
`repartition` is not really required, but was there to show the usage. I
will remove it from the example. Groupby (partitionbykey) can also be removed
and aggregate can do an implicit fields grouping.
>builder.newStream(new RandomSentenceSpout(), new ValueMapper<String>(0), 2)
.flatMap(s -> Arrays.asList(s.split(" ")))
.window(TumblingWindows.of(Duration.seconds(10)))
// short hand for .aggregateBy(1 /*index to use in grouping*/, new
Count<>(), 3 /*new number of partitions although we probably want to internally
actually have many more partitions but only 3 executors to start out with*/)
.countBy(1, 3)
.filter(x -> x.getSecond() >= 5)
.print();
Since count seems to be a common operation it can be added as an additional
api. But aggregate is more general and users can provide different
`Aggregators`.
>But I am most concerned about expanding this API in the future. The stated
plan is to start out with defining the API and implementing a back end that
supports at most one and at least once processing and then move on to exactly
once processing. I really would like to have a plan in place on how we expect
to do exactly once processing before we go too deeply into the API.
The state checkpointing mechanism we have can be extended to provide
exactly once. (e.g with transactional updates and message de-duplication
similar to the millwheel model), or saving source level spout offsets.
The initial ideas for phase 2 are listed here -
https://docs.google.com/document/d/1Ew7uFF1UJ6e_zq0t4bM6A9auuEaArviAjjWYSpVFqPY/edit#heading=h.nmf14n
> My concern is that the APIs for a lot of different user facing things may
need to change for exactly once,
The changes would mostly be in the underlying framework and we may add some
additional constraints like the emitted tuple should have a unique message Id
field and re-emits with the same message id for exactly once. It can be
non-intrusive without changes at the api level.
>This also goes for the windows. Having more robust windowing may impact
some of these APIs. Have we thought that through?
The underlying windowing framework has to be enhanced before this can be
supported. From the api point of view, the `window()` function right now takes
a window config builder where we can add additional apis to support triggers
and so on. Something like,
`stream.window(TumblingWindows.of(...).triggering(...).withAllowedLateness(Duration.standardDays(2))`
and so on, and the window will trigger based on the window config.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---