> Instead, why would we not have batch.mode=true/false?

Matthias already replied in detail, but let me also throw in my thoughts:

I'd argue that "there is no batch [mode]" (just like "there is no spoon"
for the Matrix fans out there :-P).  What would a batch mode look like?  It
is primarily defined by processing being done on bounded/finite data --
i.e. it's about the nature of the input data, not about the nature of the
execution mode.  The general case is processing on unbounded/infinite data,
i.e. what stream processing is typically associated with.

The question thus becomes how would we let users define which
bounded/finite data should be processed, i.e. how to split a continuous
stream into batch chunks.  The current proposal for incremental processing
does this (by intent) in a limited way, as Matthias outlined.

Does that make sense?




On Thu, Dec 1, 2016 at 1:17 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Right now, there is only one config value and I am open to better
> suggestions.
>
> We did not go for batch.mode=true/false because we might want to have
> auto stop at specific stop-offsets or stop-timestamp later on. So we can
> extend the parameter with new values like autostop.at=timestamp in
> combination with a new parameter that does define the stop timestamp
> that gets applied over all input partitions.
>
> Of course, there are other ways to do this extension with different
> "where to stop" policies, too. However, only using "batch.mode" as
> parameter name right now also has the disadvantage to be less self
> descriptive compared to "autostop.at=eol" -- it is not immediately clear
> what "batch.mode" means and that is will stop at EOL.
>
> But as I said; I am more than happy to change this to a better name.
>
>
> -Matthias
>
>
> On 11/30/16 3:53 PM, Sriram Subramanian wrote:
> > I agree that the metadata topic is required to build a batching semantic
> > that is intuitive.
> >
> > One question on the config -
> >
> > autostop.at
> >
> > I see one value for it - eol. What other values can be used? Instead, why
> > would we not have batch.mode=true/false?
> >
> > On Wed, Nov 30, 2016 at 1:51 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> Both types of intermediate topics are handled the exact same way and
> >> both types do connect different subtopologies (even if the user might
> >> not be aware that there are multiple subtopologies in case of internal
> >> data repartitioning). So there is no distinction between user
> >> intermediate topics (via through()) and internal intermediate
> >> repartitioning topics.
> >>
> >> I do also not understand your argument about "coupling instances"? The
> >> only "synchronization" is at startup time until the marker is written.
> >> Afterwards all instances just run as always. Furthermore, the metadata
> >> topic will be written within the leader while computing the overall
> >> partition assignment. Thus, the metadata topic will be fully populated
> >> (including the marker) before the individual instance will receive their
> >> assignment via group management protocol. So there is not more
> >> "synchronization" than before, as group management does synchronize
> >> instances anyway at startup.
> >>
> >> About startup failure. Yes, there is the case that the leader could
> >> potentially fail before the marker gets written. For this case, we have
> >> to consider a few things:
> >>
> >> 1. the net effect is, that no data will be processed by any instance
> >>    (so application can start up, because no partition assignment will be
> >> distributed via group management, as the leader did fail while computing
> >> the assignment)
> >>
> >> 2. the failure would occur on partition assignment what would be a
> >> severe failure anyway and the application has bigger problems than a
> >> missing marker in the meta data topic (nobody will get partitioned
> >> assigned as the leader did not finish the assignment computation)
> >>
> >> 3. if the leader fails, a different application will become the leader.
> >>    a) thus, if it is a permanent problem, eventually all instances are
> >> going down
> >>    b) if the problem is transient, the probability is very high that the
> >> new leader will not fail
> >>
> >>
> >>
> >> -Matthias
> >>
> >> On 11/30/16 1:21 PM, Eno Thereska wrote:
> >>> In the KIP, two types of intermediate topics are described, 1) ones
> that
> >> connect two sub-topologies, and 2) others that are internal
> repartitioning
> >> topics (e.g., for joins).
> >>> I wasn't envisioning stopping the consumption of (2) at the HWM. The
> HWM
> >> can be used for the source topics only (so I agree with your "joins"
> >> scenario, but for a different reason).
> >>>
> >>> The case I am worried about is (1) when there are implicit connections
> >> between application instances where a 2nd instance's source topics
> would be
> >> the 1st instances output topics. In that case I was suggesting not to
> >> couple those instances.
> >>>
> >>> In the (corner) case when the application fails repeatedly, it can
> still
> >> fail right before we write to the metadata topic, so that corner case
> can
> >> still happen. However, it should be extremely rare, and I'd argue if the
> >> app is failing repeatedly N times there are bigger problems with the
> app.
> >>>
> >>> Eno
> >>>
> >>>> On 30 Nov 2016, at 11:52, Damian Guy <damian....@gmail.com> wrote:
> >>>>
> >>>> I think the KIP looks good. I also think we need the metadata topic
> >>>> in-order to provide sane guarantees on what data will be processed.
> >>>>
> >>>> As Matthias has outlined in the KIP we need to know when to stop
> >> consuming
> >>>> from intermediate topics, i.e, topics that are part of the same
> >> application
> >>>> but are used for re-partitioning or through etc. Without the metadata
> >> topic
> >>>> the consumption from the intermediate topics would always be one run
> >>>> behind. In the case of a join requiring partitioning this would result
> >> in
> >>>> no output for the first run and then in subsequent runs you'd get the
> >>>> output from the previous run - i'd find this a bit odd.
> >>>>
> >>>> Also I think having a fixed HWM IMO is a good idea. If you are running
> >> your
> >>>> streams app in some shared environment, then you don't want to get
> into
> >> a
> >>>> situation where the app fails (for random reasons), restarts with a
> new
> >>>> HMW, fails, restarts... and then continues to consume resources for
> >> ever as
> >>>> the HMW is constantly moving forward. So i think the approach in the
> KIP
> >>>> helps batch-mode streams apps to be good-citizens when running in
> shared
> >>>> environments.
> >>>>
> >>>> Thanks,
> >>>> Damian
> >>>>
> >>>> On Wed, 30 Nov 2016 at 10:40 Eno Thereska <eno.there...@gmail.com>
> >> wrote:
> >>>>
> >>>>> Hi Matthias,
> >>>>>
> >>>>> I like the first part of the KIP. However, the second part with the
> >>>>> failure modes and metadata topic is quite complex and I'm worried it
> >>>>> doesn't solve the problems you mention under failure. For example,
> the
> >>>>> application can fail before writing to the metadata topic. In that
> >> case, it
> >>>>> is not clear what the second app instance should do (for the handling
> >> of
> >>>>> intermediate topics case). So in general, we have the problem of
> >> failures
> >>>>> during writes to the metadata topic itself.
> >>>>>
> >>>>> Also, for the intermediate topics example, I feel like we are trying
> to
> >>>>> provide some sort of synchronisation between app instances with this
> >>>>> approach. By default today such synchronisation does not exist. One
> >>>>> instances writes to the intermediate topic, and the other reads from
> >> it,
> >>>>> but only eventually. That is a nice way to decouple instances in my
> >> opinion.
> >>>>>
> >>>>> The user can always run the batch processing multiple times and
> >> eventually
> >>>>> all instances will produce some output. The user's app can check
> >> whether
> >>>>> the output size is satisfactory and then not run any further loops.
> So
> >> I
> >>>>> feel they can already get a lot with the simpler first part of the
> KIP.
> >>>>>
> >>>>> Thanks
> >>>>> Eno
> >>>>>
> >>>>>
> >>>>>> On 30 Nov 2016, at 05:45, Matthias J. Sax <matth...@confluent.io>
> >> wrote:
> >>>>>>
> >>>>>> Thanks for your input.
> >>>>>>
> >>>>>> To clarify: the main reason to add the metadata topic is to cope
> with
> >>>>>> subtopologies that are connected via intermediate topic (either
> >>>>>> user-defined via through() or internally created for data
> >>>>> repartitioning).
> >>>>>>
> >>>>>> Without this handling, the behavior would be odd and user experience
> >>>>>> would be bad.
> >>>>>>
> >>>>>> Thus, using the metadata topic for have a "fixed HW" is just a small
> >>>>>> add-on -- and more or less for free, because the metadata topic is
> >>>>>> already there.
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>> On 11/29/16 7:53 PM, Neha Narkhede wrote:
> >>>>>>> Thanks for initiating this. I think this is a good first step
> towards
> >>>>>>> unifying batch and stream processing in Kafka.
> >>>>>>>
> >>>>>>> I understood this capability to be simple yet very useful; it
> allows
> >> a
> >>>>>>> Streams program to process a log, in batch, in arbitrary windows
> >>>>> defined by
> >>>>>>> the difference between the HW and the current offset. Basically, it
> >>>>>>> provides a simple means for a Streams program to "stop" after
> >>>>> processing a
> >>>>>>> batch, stop (just like a batch program would) and continue where it
> >> left
> >>>>>>> off when restarted. In other words, it allows batch processing
> >> behavior
> >>>>> for
> >>>>>>> a Streams app without code changes.
> >>>>>>>
> >>>>>>> This feature is useful but I do not think there is a necessity to
> >> add a
> >>>>>>> metadata topic. After all, the user doesn't really care as much
> about
> >>>>>>> exactly where the batch ends. This feature allows an app to
> "process
> >> as
> >>>>>>> much as there is data to process" and the way it determines how
> much
> >>>>> data
> >>>>>>> there is to process is by reading the HW on startup. If there is
> new
> >>>>> data
> >>>>>>> written to the log right after it starts up, it will process it
> when
> >>>>>>> restarted the next time. If it starts, reads HW but fails, it will
> >>>>> restart
> >>>>>>> and process a little more before it stops again. The fact that the
> HW
> >>>>>>> changes in some scenarios isn't an issue since a batch program that
> >>>>> behaves
> >>>>>>> this way doesn't really care exactly what that HW is.
> >>>>>>>
> >>>>>>> There might be cases which require adding more topics but I would
> shy
> >>>>> away
> >>>>>>> from adding complexity wherever possible as it complicates
> operations
> >>>>> and
> >>>>>>> reduces simplicity.
> >>>>>>>
> >>>>>>> Other than this issue, I'm +1 on adding this feature. I think it is
> >>>>> pretty
> >>>>>>> powerful.
> >>>>>>>
> >>>>>>>
> >>>>>>> On Mon, Nov 28, 2016 at 10:48 AM Matthias J. Sax <
> >> matth...@confluent.io
> >>>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi all,
> >>>>>>>>
> >>>>>>>> I want to start a discussion about KIP-95:
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams
> >>>>>>>>
> >>>>>>>> Looking forward to your feedback.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>> Thanks,
> >>>>>>> Neha
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to