> Instead, why would we not have batch.mode=true/false? Matthias already replied in detail, but let me also throw in my thoughts:
I'd argue that "there is no batch [mode]" (just like "there is no spoon" for the Matrix fans out there :-P). What would a batch mode look like? It is primarily defined by processing being done on bounded/finite data -- i.e. it's about the nature of the input data, not about the nature of the execution mode. The general case is processing on unbounded/infinite data, i.e. what stream processing is typically associated with. The question thus becomes how would we let users define which bounded/finite data should be processed, i.e. how to split a continuous stream into batch chunks. The current proposal for incremental processing does this (by intent) in a limited way, as Matthias outlined. Does that make sense? On Thu, Dec 1, 2016 at 1:17 AM, Matthias J. Sax <[email protected]> wrote: > Right now, there is only one config value and I am open to better > suggestions. > > We did not go for batch.mode=true/false because we might want to have > auto stop at specific stop-offsets or stop-timestamp later on. So we can > extend the parameter with new values like autostop.at=timestamp in > combination with a new parameter that does define the stop timestamp > that gets applied over all input partitions. > > Of course, there are other ways to do this extension with different > "where to stop" policies, too. However, only using "batch.mode" as > parameter name right now also has the disadvantage to be less self > descriptive compared to "autostop.at=eol" -- it is not immediately clear > what "batch.mode" means and that is will stop at EOL. > > But as I said; I am more than happy to change this to a better name. > > > -Matthias > > > On 11/30/16 3:53 PM, Sriram Subramanian wrote: > > I agree that the metadata topic is required to build a batching semantic > > that is intuitive. > > > > One question on the config - > > > > autostop.at > > > > I see one value for it - eol. What other values can be used? Instead, why > > would we not have batch.mode=true/false? > > > > On Wed, Nov 30, 2016 at 1:51 PM, Matthias J. Sax <[email protected]> > > wrote: > > > >> Both types of intermediate topics are handled the exact same way and > >> both types do connect different subtopologies (even if the user might > >> not be aware that there are multiple subtopologies in case of internal > >> data repartitioning). So there is no distinction between user > >> intermediate topics (via through()) and internal intermediate > >> repartitioning topics. > >> > >> I do also not understand your argument about "coupling instances"? The > >> only "synchronization" is at startup time until the marker is written. > >> Afterwards all instances just run as always. Furthermore, the metadata > >> topic will be written within the leader while computing the overall > >> partition assignment. Thus, the metadata topic will be fully populated > >> (including the marker) before the individual instance will receive their > >> assignment via group management protocol. So there is not more > >> "synchronization" than before, as group management does synchronize > >> instances anyway at startup. > >> > >> About startup failure. Yes, there is the case that the leader could > >> potentially fail before the marker gets written. For this case, we have > >> to consider a few things: > >> > >> 1. the net effect is, that no data will be processed by any instance > >> (so application can start up, because no partition assignment will be > >> distributed via group management, as the leader did fail while computing > >> the assignment) > >> > >> 2. the failure would occur on partition assignment what would be a > >> severe failure anyway and the application has bigger problems than a > >> missing marker in the meta data topic (nobody will get partitioned > >> assigned as the leader did not finish the assignment computation) > >> > >> 3. if the leader fails, a different application will become the leader. > >> a) thus, if it is a permanent problem, eventually all instances are > >> going down > >> b) if the problem is transient, the probability is very high that the > >> new leader will not fail > >> > >> > >> > >> -Matthias > >> > >> On 11/30/16 1:21 PM, Eno Thereska wrote: > >>> In the KIP, two types of intermediate topics are described, 1) ones > that > >> connect two sub-topologies, and 2) others that are internal > repartitioning > >> topics (e.g., for joins). > >>> I wasn't envisioning stopping the consumption of (2) at the HWM. The > HWM > >> can be used for the source topics only (so I agree with your "joins" > >> scenario, but for a different reason). > >>> > >>> The case I am worried about is (1) when there are implicit connections > >> between application instances where a 2nd instance's source topics > would be > >> the 1st instances output topics. In that case I was suggesting not to > >> couple those instances. > >>> > >>> In the (corner) case when the application fails repeatedly, it can > still > >> fail right before we write to the metadata topic, so that corner case > can > >> still happen. However, it should be extremely rare, and I'd argue if the > >> app is failing repeatedly N times there are bigger problems with the > app. > >>> > >>> Eno > >>> > >>>> On 30 Nov 2016, at 11:52, Damian Guy <[email protected]> wrote: > >>>> > >>>> I think the KIP looks good. I also think we need the metadata topic > >>>> in-order to provide sane guarantees on what data will be processed. > >>>> > >>>> As Matthias has outlined in the KIP we need to know when to stop > >> consuming > >>>> from intermediate topics, i.e, topics that are part of the same > >> application > >>>> but are used for re-partitioning or through etc. Without the metadata > >> topic > >>>> the consumption from the intermediate topics would always be one run > >>>> behind. In the case of a join requiring partitioning this would result > >> in > >>>> no output for the first run and then in subsequent runs you'd get the > >>>> output from the previous run - i'd find this a bit odd. > >>>> > >>>> Also I think having a fixed HWM IMO is a good idea. If you are running > >> your > >>>> streams app in some shared environment, then you don't want to get > into > >> a > >>>> situation where the app fails (for random reasons), restarts with a > new > >>>> HMW, fails, restarts... and then continues to consume resources for > >> ever as > >>>> the HMW is constantly moving forward. So i think the approach in the > KIP > >>>> helps batch-mode streams apps to be good-citizens when running in > shared > >>>> environments. > >>>> > >>>> Thanks, > >>>> Damian > >>>> > >>>> On Wed, 30 Nov 2016 at 10:40 Eno Thereska <[email protected]> > >> wrote: > >>>> > >>>>> Hi Matthias, > >>>>> > >>>>> I like the first part of the KIP. However, the second part with the > >>>>> failure modes and metadata topic is quite complex and I'm worried it > >>>>> doesn't solve the problems you mention under failure. For example, > the > >>>>> application can fail before writing to the metadata topic. In that > >> case, it > >>>>> is not clear what the second app instance should do (for the handling > >> of > >>>>> intermediate topics case). So in general, we have the problem of > >> failures > >>>>> during writes to the metadata topic itself. > >>>>> > >>>>> Also, for the intermediate topics example, I feel like we are trying > to > >>>>> provide some sort of synchronisation between app instances with this > >>>>> approach. By default today such synchronisation does not exist. One > >>>>> instances writes to the intermediate topic, and the other reads from > >> it, > >>>>> but only eventually. That is a nice way to decouple instances in my > >> opinion. > >>>>> > >>>>> The user can always run the batch processing multiple times and > >> eventually > >>>>> all instances will produce some output. The user's app can check > >> whether > >>>>> the output size is satisfactory and then not run any further loops. > So > >> I > >>>>> feel they can already get a lot with the simpler first part of the > KIP. > >>>>> > >>>>> Thanks > >>>>> Eno > >>>>> > >>>>> > >>>>>> On 30 Nov 2016, at 05:45, Matthias J. Sax <[email protected]> > >> wrote: > >>>>>> > >>>>>> Thanks for your input. > >>>>>> > >>>>>> To clarify: the main reason to add the metadata topic is to cope > with > >>>>>> subtopologies that are connected via intermediate topic (either > >>>>>> user-defined via through() or internally created for data > >>>>> repartitioning). > >>>>>> > >>>>>> Without this handling, the behavior would be odd and user experience > >>>>>> would be bad. > >>>>>> > >>>>>> Thus, using the metadata topic for have a "fixed HW" is just a small > >>>>>> add-on -- and more or less for free, because the metadata topic is > >>>>>> already there. > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> > >>>>>> On 11/29/16 7:53 PM, Neha Narkhede wrote: > >>>>>>> Thanks for initiating this. I think this is a good first step > towards > >>>>>>> unifying batch and stream processing in Kafka. > >>>>>>> > >>>>>>> I understood this capability to be simple yet very useful; it > allows > >> a > >>>>>>> Streams program to process a log, in batch, in arbitrary windows > >>>>> defined by > >>>>>>> the difference between the HW and the current offset. Basically, it > >>>>>>> provides a simple means for a Streams program to "stop" after > >>>>> processing a > >>>>>>> batch, stop (just like a batch program would) and continue where it > >> left > >>>>>>> off when restarted. In other words, it allows batch processing > >> behavior > >>>>> for > >>>>>>> a Streams app without code changes. > >>>>>>> > >>>>>>> This feature is useful but I do not think there is a necessity to > >> add a > >>>>>>> metadata topic. After all, the user doesn't really care as much > about > >>>>>>> exactly where the batch ends. This feature allows an app to > "process > >> as > >>>>>>> much as there is data to process" and the way it determines how > much > >>>>> data > >>>>>>> there is to process is by reading the HW on startup. If there is > new > >>>>> data > >>>>>>> written to the log right after it starts up, it will process it > when > >>>>>>> restarted the next time. If it starts, reads HW but fails, it will > >>>>> restart > >>>>>>> and process a little more before it stops again. The fact that the > HW > >>>>>>> changes in some scenarios isn't an issue since a batch program that > >>>>> behaves > >>>>>>> this way doesn't really care exactly what that HW is. > >>>>>>> > >>>>>>> There might be cases which require adding more topics but I would > shy > >>>>> away > >>>>>>> from adding complexity wherever possible as it complicates > operations > >>>>> and > >>>>>>> reduces simplicity. > >>>>>>> > >>>>>>> Other than this issue, I'm +1 on adding this feature. I think it is > >>>>> pretty > >>>>>>> powerful. > >>>>>>> > >>>>>>> > >>>>>>> On Mon, Nov 28, 2016 at 10:48 AM Matthias J. Sax < > >> [email protected] > >>>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi all, > >>>>>>>> > >>>>>>>> I want to start a discussion about KIP-95: > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams > >>>>>>>> > >>>>>>>> Looking forward to your feedback. > >>>>>>>> > >>>>>>>> > >>>>>>>> -Matthias > >>>>>>>> > >>>>>>>> > >>>>>>>> -- > >>>>>>> Thanks, > >>>>>>> Neha > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>> > >> > >> > > > >
