Hi Aljoscha, The watermark reporting is done via ProcessContinuation.futureOutputWatermark, at the granularity of returning from individual processElement() calls - you return from the call and give a watermark on your future output. We assume that updating watermark is sufficient at a per-bundle level (or, if not, then that you can make bundles small enough) cause that's the same level at which state changes, timers etc. are committed. It can be implemented by setting a per-key watermark hold and updating it when each call for this element returns. That's the way it is implemented in my current prototype https://github.com/apache/incubator-beam/pull/896 (see SplittableParDo.ProcessFn)
On Mon, Aug 29, 2016 at 2:55 AM Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > I have another question about this: currently, unbounded sources have > special logic for determining the watermark and the system periodically > asks the sources for the current watermark. As I understood it, watermarks > are only "generated" at the sources. How will this work when sources are > implemented as a combination of DoFns and SplittableDoFns? Will > SplittableDoFns be asked for a watermark, does this mean that watermarks > can then be "generated" at any operation? > > Cheers, > Aljoscha > > On Sun, 21 Aug 2016 at 22:34 Eugene Kirpichov <kirpic...@google.com.invalid > > > wrote: > > > Hi JB, > > > > Yes, I'm assuming you're referring to the "magic" part on the transform > > expansion diagram. This is indeed runner-specific, and timers+state are > > likely the simplest way to do this for an SDF that does unbounded amount > of > > work. > > > > On Sun, Aug 21, 2016 at 12:14 PM Jean-Baptiste Onofré <j...@nanthrax.net> > > wrote: > > > > > Anyway, from a runner perspective, we will have kind of API (part of > the > > > Runner API) to "orchestrate" the SDF as we discussed during the call, > > > right ? > > > > > > Regards > > > JB > > > > > > On 08/21/2016 07:24 PM, Eugene Kirpichov wrote: > > > > Hi Aljoscha, > > > > This is an excellent question! And the answer is, we don't need any > new > > > > concepts like "SDF executor" and can rely on the per-key state and > > timers > > > > machinery that already exists in all runners because it's necessary > to > > > > implement windowing/triggering properly. > > > > > > > > Note that this is already somewhat addressed in the previously posted > > > State > > > > and Timers proposal https://s.apache.org/beam-state , under "per-key > > > > workflows". > > > > > > > > Think of it this way, using the Kafka example: we'll expand it into a > > > > transform: > > > > > > > > (1) ParDo { topic -> (unique key, topic, partition, [0, inf))) for > > > > partition in topic.listPartitions() } > > > > (2) GroupByKey > > > > (3) ParDo { key, topic, partition, R -> Kafka reader code in the > > > > proposal/slides } > > > > - R is the OffsetRange restriction which in this case will be > always > > of > > > > the form [startOffset, inf). > > > > - there'll be just 1 value per key, but we use GBK to just get > access > > > to > > > > the per-key state/timers machinery. This may be runner-specific; > maybe > > > some > > > > runners don't need a GBK to do that. > > > > > > > > Now suppose the topic has two partitions, P1 and P2, and they get > > > assigned > > > > unique keys K1, K2. > > > > Then the input to (3) will be a collection of: (K1, topic, P1, [0, > > inf)), > > > > (K2, topic, P2, [0, inf)). > > > > Suppose we have just 1 worker with just 1 thread. Now, how will this > > > thread > > > > be able to produce elements from both P1 and P2? here's how. > > > > > > > > The thread will process (K1, topic, P1, [0, inf)), checkpoint after a > > > > certain time or after a certain number of elements are output (just > > like > > > > with the current UnboundedSource reading code) producing a residual > > > > restriction R1' (basically a new start timestamp), put R11 into the > > > per-key > > > > state and set a timer T1 to resume. > > > > Then it will process (K2, topic, P2, [0, inf)), do the same > producing a > > > > residual restriction R2' and setting a timer T2 to resume. > > > > Then timer T1 will fire in the context of the key K1. The thread will > > > call > > > > processElement again, this time supplying R1' as the restriction; the > > > > process repeats and after a while it checkpoints and stores R1'' into > > > state > > > > of K1. > > > > Then timer T2 will fire in the context of K2, run processElement for > a > > > > while, set a new timer and store R2'' into the state of K2. > > > > Etc. > > > > If partition 1 goes away, the processElement call will return "do not > > > > resume", so a timer will not be set and instead the state associated > > with > > > > K1 will be GC'd. > > > > > > > > So basically it's almost like cooperative thread scheduling: things > run > > > for > > > > a while, until the runner tells them to checkpoint, then they set a > > timer > > > > to resume themselves, and the runner fires the timers, and the > process > > > > repeats. And, again, this only requires things that runners can > already > > > do > > > > - state and timers, but no new concept of SDF executor (and > > consequently > > > no > > > > necessity to choose/tune how many you need). > > > > > > > > Makes sense? > > > > > > > > On Sat, Aug 20, 2016 at 9:46 AM Aljoscha Krettek < > aljos...@apache.org> > > > > wrote: > > > > > > > >> Hi, > > > >> I have another question that I think wasn't addressed in the > meeting. > > At > > > >> least it wasn't mentioned in the notes. > > > >> > > > >> In the context of replacing sources by a combination of to SDFs, how > > do > > > you > > > >> determine how many "SDF executor" instances you need downstream? For > > the > > > >> sake of argument assume that both SDFs are executed with > parallelism 1 > > > (or > > > >> one per worker). Now, if you have a file source that reads from a > > static > > > >> set of files the first SDF would emit the filenames while the second > > SDF > > > >> would receive the filenames and emit their contents. This works well > > and > > > >> the downstream SDF can process one filename after the other. Now, > > think > > > of > > > >> something like a Kafka source. The first SDF would emit the > partitions > > > (say > > > >> 4 partitions, in this example) and the second SDF would be > responsible > > > for > > > >> reading from a topic and emitting elements. Reading from one topic > > never > > > >> finishes so you can't process the topics in series. I think you > would > > > need > > > >> to have 4 downstream "SDF executor" instances. The question now is: > > how > > > do > > > >> you determine whether you are in the first or the second situation? > > > >> > > > >> Probably I'm just overlooking something and this is already dealt > with > > > >> somewhere... :-) > > > >> > > > >> Cheers, > > > >> Aljoscha > > > >> > > > >> On Fri, 19 Aug 2016 at 21:02 Ismaël Mejía <ieme...@gmail.com> > wrote: > > > >> > > > >>> Hello, > > > >>> > > > >>> Thanks for the notes both Dan and Eugene, and for taking the time > to > > do > > > >> the > > > >>> presentation and answer our questions. > > > >>> > > > >>> I mentioned the ongoing work on dynamic scaling on Flink because I > > > >> suppose > > > >>> that it will address dynamic rebalancing eventually (there are > > multiple > > > >>> changes going on for dynamic scaling). > > > >>> > > > >>> > > > >>> > > > >> > > > > > > https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#heading=h.2suhu1fjxmp4 > > > >>> > > > >>> > > https://lists.apache.org/list.html?d...@flink.apache.org:lte=1M:FLIP-8 > > > >>> > > > >>> Anyway I am far from an expert on flink, but probably the flink > guys > > > can > > > >>> give their opinion about this and refer to a more precise document > > that > > > >> the > > > >>> ones I mentioned.. > > > >>> > > > >>> Thanks again, > > > >>> Ismaël > > > >>> > > > >>> On Fri, Aug 19, 2016 at 8:52 PM, Jean-Baptiste Onofré < > > j...@nanthrax.net > > > > > > > >>> wrote: > > > >>> > > > >>>> Great summary Eugene and Dan. > > > >>>> > > > >>>> And thanks again for the details, explanation, and discussion. > > > >>>> > > > >>>> Regards > > > >>>> JB > > > >>>> > > > >>>> > > > >>>> On 08/19/2016 08:16 PM, Eugene Kirpichov wrote: > > > >>>> > > > >>>>> Thanks for attending, everybody! > > > >>>>> > > > >>>>> Here are meeting notes (thanks Dan!). > > > >>>>> > > > >>>>> Q: Will SplittableDoFn enable better repartitioning of the > > > >> input/output > > > >>>>> data? > > > >>>>> A: Not really; repartitioning is orthogonal to SDF. > > > >>>>> > > > >>>>> Current Source API suffers from lack of composition and > scalability > > > >>>>> because > > > >>>>> we treat sources too much as metadata, not enough as data. > > > >>>>> > > > >>>>> Q(slide with transform expansion): who does the "magic"? > > > >>>>> A: The runner. Checkpointing and dynamically splitting > restrictions > > > >> will > > > >>>>> require collaboration with the runner. > > > >>>>> > > > >>>>> Q: How does the runner interact with the DoFn to control the > > > >>> restrictions? > > > >>>>> Is it related to the centralized job tracker etc.? > > > >>>>> A: RestrictionTracker is a simple helper object, that exists > purely > > > on > > > >>> the > > > >>>>> worker while executing a single partition, and interacts with the > > > >> worker > > > >>>>> harness part of the runner. Not to be confused with the > centralized > > > >> job > > > >>>>> tracker (master) - completely unrelated. Worker harness, of > course, > > > >>>>> interacts with the master in some relevant ways (e.g. Dataflow > > master > > > >>> can > > > >>>>> tell "you're a straggler, you should split"). > > > >>>>> > > > >>>>> Q: Is this a new DoFn subclass, or how will this integrate with > the > > > >>>>> existing code? > > > >>>>> A: It's a feature of reflection-based DoFn ( > > > >>> https://s.apache.org/a-new-do > > > >>>>> fn) > > > >>>>> - just another optional parameter of type RestrictionTracker to > > > >>>>> processElement() which is dynamically bound via reflection, so > > fully > > > >>>>> backward/forward compatible, and looks to users like a regular > > DoFn. > > > >>>>> > > > >>>>> Q: why is fractionClaimed a double? > > > >>>>> A: It's a number in 0..1. Job-wide magic (e.g. autoscaling, > dynamic > > > >>>>> rebalancing) requires a uniform way to represent progress through > > > >>>>> different > > > >>>>> sources. > > > >>>>> > > > >>>>> Q: Spark runner is microbatch-based, so this seems to map well > onto > > > >>>>> checkpoint/resume, right? > > > >>>>> A: Yes; actually the Dataflow runner is, at a worker level, also > > > >>>>> microbatch-based. The way SDF interacts with a runner will be > very > > > >>> similar > > > >>>>> to how a Bounded/UnboundedSource interacts with a runner. > > > >>>>> > > > >>>>> Q: Using SDF, what would be the "packaging" of the IO? > > > >>>>> A: Same as currently: package IO's as PTransforms and their > > > >>> implementation > > > >>>>> under the hood can be anything: Source, simple ParDo's, SDF, etc. > > > E.g. > > > >>>>> Datastore was recently refactored from BoundedSource to ParDo > > (ended > > > >> up > > > >>>>> simpler and more scalable), transparently to users. > > > >>>>> > > > >>>>> Q: What's the timeline; what to do with the IOs currently in > > > >>> development? > > > >>>>> A: Timeline is O(months). Keep doing what you're doing and > working > > on > > > >>> top > > > >>>>> of Source APIs when necessary and simple ParDo's otherwise. > > > >>>>> > > > >>>>> Q: What's the impact for the runner writers? > > > >>>>> A: Tentatively expected that most of the code for running an SDF > > will > > > >> be > > > >>>>> common to runners, with some amount of per-runner glue code, just > > > like > > > >>>>> GBK/windowing/triggering. Impact on Dataflow runner is larger > since > > > it > > > >>>>> supports dynamic rebalancing in batch mode and this is the > hardest > > > >> part, > > > >>>>> but for other runners shouldn't be too hard. > > > >>>>> > > > >>>>> JB: Talend has people who can help with this: e.g. help integrate > > > into > > > >>>>> Spark runner, refactor IOs etc. Amit also willing to chat about > > > >>> supporting > > > >>>>> SDF in Spark runner. > > > >>>>> > > > >>>>> Ismael: There's a Flink proposal about dynamic rebalancing. > Ismael > > > >> will > > > >>>>> send a link. > > > >>>>> > > > >>>>> > > > >>>>> On Fri, Aug 19, 2016 at 7:21 AM Jean-Baptiste Onofré < > > > j...@nanthrax.net > > > >>> > > > >>>>> wrote: > > > >>>>> > > > >>>>> Hi Eugene, > > > >>>>>> > > > >>>>>> thanks for the reminder. > > > >>>>>> > > > >>>>>> Just to prepare some topics for the call, please find some > points: > > > >>>>>> > > > >>>>>> 1. Using SDF, what would be the "packaging" of the IO ? It > sounds > > to > > > >> me > > > >>>>>> that we can keep the IO packaging style (using with* setters for > > the > > > >> IO > > > >>>>>> configuration) and replace PTransform, Source, Reader, ... > > directly > > > >>> with > > > >>>>>> SDF. Correct ? > > > >>>>>> > > > >>>>>> 2. What's your plan in term of release to include SDF ? We have > > > >> several > > > >>>>>> IOs in preparation and I wonder if it's worth to start to use > the > > > new > > > >>>>>> SDF API or not. > > > >>>>>> > > > >>>>>> 3. What's the impact for the runner writers ? The runners will > > have > > > >> to > > > >>>>>> support SDF, that could be tricky depending of the execution > > engine. > > > >> In > > > >>>>>> the worst case where the runner can't fully support SDF, does it > > > mean > > > >>>>>> that most of our IOs will be useless ? > > > >>>>>> > > > >>>>>> Just my dumb topics ;) > > > >>>>>> > > > >>>>>> Thanks, > > > >>>>>> See you at 8am ! > > > >>>>>> > > > >>>>>> Regards > > > >>>>>> JB > > > >>>>>> > > > >>>>>> On 08/19/2016 02:29 AM, Eugene Kirpichov wrote: > > > >>>>>> > > > >>>>>>> Hello everybody, > > > >>>>>>> > > > >>>>>>> Just a reminder: > > > >>>>>>> > > > >>>>>>> The meeting is happening tomorrow - Friday Aug 19th, 8am-9am > PST, > > > to > > > >>>>>>> join > > > >>>>>>> the call go to > > > >>>>>>> > https://hangouts.google.com/hangouts/_/google.com/splittabledofn > > . > > > >>>>>>> I intend to go over the proposed design and then have a > free-form > > > >>>>>>> discussion. > > > >>>>>>> > > > >>>>>>> Please have a skim through the proposal doc: > > https://s.apache.org/ > > > >>>>>>> splittable-do-fn > > > >>>>>>> I also made some slides that are basically a trimmed-down > version > > > of > > > >>> the > > > >>>>>>> doc to use as a guide when conducting the meeting, > > > >>>>>>> > > > >>>>>>> https://docs.google.com/presentation/d/1DfCp9VRLH-5Ap5nbz047 > > > >>>>>> Fiv3NfjrywJVSdef5dRY4aE/edit?usp=sharing > > > >>>>>> > > > >>>>>>> . > > > >>>>>>> > > > >>>>>>> I will post notes from the meeting on this thread afterwards. > > > >>>>>>> > > > >>>>>>> Thanks, looking forward. > > > >>>>>>> > > > >>>>>>> On Fri, Aug 12, 2016 at 5:35 PM Dan Halperin > > > >>>>>>> <dhalp...@google.com.invalid > > > >>>>>>> > > > >>>>>>> wrote: > > > >>>>>>> > > > >>>>>>> This is pretty cool! I'll be there too. (unless the hangout > gets > > > too > > > >>>>>>>> > > > >>>>>>> full > > > >>>>>> > > > >>>>>>> -- if so, I'll drop out in favor of others who aren't lucky > > enough > > > >> to > > > >>>>>>>> > > > >>>>>>> get > > > >>>>>> > > > >>>>>>> to talk to Eugene all the time.) > > > >>>>>>>> > > > >>>>>>>> On Fri, Aug 12, 2016 at 4:03 PM, Andrew Psaltis < > > > >>>>>>>> > > > >>>>>>> psaltis.and...@gmail.com> > > > >>>>>> > > > >>>>>>> wrote: > > > >>>>>>>> > > > >>>>>>>> +1 I'll join > > > >>>>>>>>> > > > >>>>>>>>> On Friday, August 12, 2016, Aparup Banerjee (apbanerj) < > > > >>>>>>>>> > > > >>>>>>>> apban...@cisco.com > > > >>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>>> wrote: > > > >>>>>>>>> > > > >>>>>>>>> + 1, me2 > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> On 8/12/16, 9:27 AM, "Amit Sela" <amitsel...@gmail.com > > > >>>>>>>>>> > > > >>>>>>>>> <javascript:;>> > > > >>>>>> > > > >>>>>>> wrote: > > > >>>>>>>>>> > > > >>>>>>>>>> +1 as in I'll join ;-) > > > >>>>>>>>>>> > > > >>>>>>>>>>> On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov > > > >>>>>>>>>>> > > > >>>>>>>>>> <kirpic...@google.com.invalid > > > >>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>>> wrote: > > > >>>>>>>>>>> > > > >>>>>>>>>>> Sounds good, thanks! > > > >>>>>>>>>>>> Then Friday Aug 19th it is, 8am-9am PST, > > > >>>>>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google. > > > >>>>>>>>>>>> > > > >>>>>>>>>>> com/splittabledofn > > > >>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>>> On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré < > > > >>>>>>>>>>>> > > > >>>>>>>>>>> j...@nanthrax.net > > > >>>>>>>>> > > > >>>>>>>>>> <javascript:;>> > > > >>>>>>>>>> > > > >>>>>>>>>>> wrote: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Hi > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Unfortunately I will be in Ireland on August 15th. What > > about > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> Friday > > > >>>>>>>> > > > >>>>>>>>> 19th ? > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Regards > > > >>>>>>>>>>>>> JB > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov > > > >>>>>>>>>>>>> <kirpic...@google.com.INVALID> wrote: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Hi JB, > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Sounds great, does the suggested time over > videoconference > > > >> work > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>> for > > > >>>>>>>> > > > >>>>>>>>> you? > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré < > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>> j...@nanthrax.net <javascript:;>> > > > >>>>>>>>>> > > > >>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Hi Eugene > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> May we talk together next week ? I like the proposal. I > > > >> would > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> just > > > >>>>>>>>> > > > >>>>>>>>>> need > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> some details for my understanding. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Thanks > > > >>>>>>>>>>>>>>> Regards > > > >>>>>>>>>>>>>>> JB > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov > > > >>>>>>>>>>>>>>> <kirpic...@google.com.INVALID> wrote: > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Hi JB, > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> What are your thoughts on this? > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> I'm also thinking of having a virtual meeting to > explain > > > >> more > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> about > > > >>>>>>>>>> > > > >>>>>>>>>>> this > > > >>>>>>>>>>>>>>>> proposal if necessary, since I understand it is a lot > to > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> digest. > > > >>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>>>>>>>>> How about: Monday Aug 15, 8am-9am Pacific time, over > > > >>> Hangouts? > > > >>>>>>>>>>>>>>>> (link: > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google. > > > >>>>>>>>>>>> > > > >>>>>>>>>>> com/splittabledofn > > > >>>>>>>>>> > > > >>>>>>>>>>> - > > > >>>>>>>>>>>>>>>> I confirmed that it can be joined without being logged > > > >> into a > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Google > > > >>>>>>>>>> > > > >>>>>>>>>>> account) > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Who'd be interested in attending, and does this > > time/date > > > >>> work > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> for > > > >>>>>>>>> > > > >>>>>>>>>> people? > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> <kirpic...@google.com <javascript:;>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Hi JB, thanks for reading and for your comments! > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> It sounds like you are concerned about continued > > support > > > >> for > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> existing > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> IO's > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> people have developed, and about backward > > compatibility? > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> We do not need to remove the Source API, and all > > existing > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Source-based > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> connectors will continue to work [though the document > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> proposes > > > >>>>>>>> > > > >>>>>>>>> at > > > >>>>>>>>>> > > > >>>>>>>>>>> some > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> point to make Read.from(Source) to translate to a > > wrapper > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> SDF > > > >>>>>>>> > > > >>>>>>>>> under > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> hood, to exercise the feature more and to make sure > > that > > > >> it > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> is > > > >>>>>>>> > > > >>>>>>>>> strictly > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> more powerful - but this is an optional > implementation > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> detail]. > > > >>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Perhaps the document phrases this too strongly - > > > >> "replacing > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> the > > > >>>>>>>>> > > > >>>>>>>>>> Source > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> API": a better phrasing would be "introducing a new > API > > > so > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> powerful > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> easy-to-use that hopefully people will choose it over > > the > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Source > > > >>>>>>>>> > > > >>>>>>>>>> API > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> all > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> the time, even though they don't have to" :) And we > can > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> discuss > > > >>>>>>>>> > > > >>>>>>>>>> whether or > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> not to actually deprecate/remove the Source API at > some > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> point > > > >>>>>>>> > > > >>>>>>>>> down > > > >>>>>>>>>> > > > >>>>>>>>>>> the > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> road, once it becomes clear whether this is the case > or > > > >> not. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> To give more context: this proposal came out of > > > >> discussions > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> within > > > >>>>>>>>>> > > > >>>>>>>>>>> the SDK > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> team over the past ~1.5 years, before the Beam > project > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> existed, > > > >>>>>>>>> > > > >>>>>>>>>> on > > > >>>>>>>>>> > > > >>>>>>>>>>> how to > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> make major improvements to the Source API; perhaps it > > > will > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> clarify > > > >>>>>>>>>> > > > >>>>>>>>>>> things > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> if I give a history of the ideas discussed: > > > >>>>>>>>>>>>>>>>> - The first idea was to introduce a > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Read.from(PCollection<Source>) > > > >>>>>>>>>> > > > >>>>>>>>>>> transform while keeping the Source API intact - this, given > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> appropriate > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> implementation, would solve most of the scalability > and > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> composability > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> issues of IO's. Then most connectors would look like : > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> ParDo<A, > > > >>>>>>>>> > > > >>>>>>>>>> Source<B>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> + Read.from(). > > > >>>>>>>>>>>>>>>>> - Then we figured that the Source class is an > > unnecessary > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> abstraction, as > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> it simply holds data. What if we only had a Reader<S, > > B> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> class > > > >>>>>>>> > > > >>>>>>>>> where > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> S is > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> the source type and B the output type? Then > connectors > > > >> would > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> be > > > >>>>>>>>> > > > >>>>>>>>>> something > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> like: ParDo<A, S> + hypothetical Read.using(Reader<S, > > > B>). > > > >>>>>>>>>>>>>>>>> - Then somebody remarked that some of the features of > > > >> Source > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> are > > > >>>>>>>>> > > > >>>>>>>>>> useful to > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> ParDo's as well: e.g. ability to report progress when > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> processing a > > > >>>>>>>>>> > > > >>>>>>>>>>> very > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> heavy element, or ability to produce very large > output > > in > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> parallel. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> - The two previous bullets were already hinting that > the > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Read.using() > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> primitive might not be so special: it just takes S and > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> produces > > > >>>>>>>>> > > > >>>>>>>>>> B: > > > >>>>>>>>>> > > > >>>>>>>>>>> isn't > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> that what a ParDo does, plus some source magic, minus > > the > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> convenience > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> of > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> c.output() vs. the start/advance() state machine? > > > >>>>>>>>>>>>>>>>> - At this point it became clear that we should > explore > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> unifying > > > >>>>>>>>> > > > >>>>>>>>>> sources > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> and ParDo's, in particular: can we bring the magic of > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> sources > > > >>>>>>>> > > > >>>>>>>>> to > > > >>>>>>>>> > > > >>>>>>>>>> ParDo's > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> but without the limitations and coding > inconveniences? > > > And > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> this > > > >>>>>>>>> > > > >>>>>>>>>> is > > > >>>>>>>>>> > > > >>>>>>>>>>> how > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> SplittableDoFn was born: bringing source magic to a > > DoFn > > > >> by > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> providing > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> a > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> RangeTracker. > > > >>>>>>>>>>>>>>>>> - Once the idea of "splittable DoFn's" was born, it > > > became > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> clear > > > >>>>>>>>> > > > >>>>>>>>>> that > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> it > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> is strictly more general than sources; at least, in > the > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> respect > > > >>>>>>>>> > > > >>>>>>>>>> that > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> sources have to produce output, while DoFn's don't: an > > SDF > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> may > > > >>>>>>>> > > > >>>>>>>>> very > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> well > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> produce no output at all, and simply perform a side > > > effect > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> in > > > >>>>>>>> > > > >>>>>>>>> a > > > >>>>>>>>> > > > >>>>>>>>>> parallel/resumable way. > > > >>>>>>>>>>>>>>>>> - Then there were countless hours of discussions on > > > >> unifying > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> the > > > >>>>>>>>> > > > >>>>>>>>>> bounded/unbounded cases, on the particulars of RangeTracker > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> APIs > > > >>>>>>>>> > > > >>>>>>>>>> reconciling parallelization and checkpointing, what the > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> relation > > > >>>>>>>>> > > > >>>>>>>>>> between > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> SDF and DF should be, etc. They culminated in the > > current > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> proposal. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> The > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> proposal comes at a time when a couple of key > > ingredients > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> are > > > >>>>>>>> > > > >>>>>>>>> (almost) > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> ready: NewDoFn to make SDF look like a regular DoFn, > > and > > > >> the > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> State/Timers > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> proposal to enable unbounded work per element. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> To put it shortly: > > > >>>>>>>>>>>>>>>>> - Yes, we will support existing Source connectors, > and > > > >> will > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> support > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> writing new ones, possibly forever. There is no > > > interference > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> with > > > >>>>>>>>>> > > > >>>>>>>>>>> current > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> users of Source. > > > >>>>>>>>>>>>>>>>> - The new API is an attempt to improve the Source > API, > > > >> taken > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> to > > > >>>>>>>>> > > > >>>>>>>>>> its > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> logical limit where it turns out that users' goals can > be > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> accomplished > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> easier and more generically entirely within ParDo's. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Let me know what you think, and thanks again! > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> <j...@nanthrax.net <javascript:;>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Hi Eugene, > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Just a question: why is it in DoFn and note an > > > >> improvement > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> of > > > >>>>>>>> > > > >>>>>>>>> Source > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> ? > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> If I understand correctly, it means that we will > have > > to > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> refactore > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> all > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> existing IO: basically, what you propose is to remove > > all > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Source > > > >>>>>>>>>> > > > >>>>>>>>>>> to > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> replace with NewDoFn. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> I'm concern with this approach, especially in term > of > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> timing: > > > >>>>>>>> > > > >>>>>>>>> clearly, > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> the IO is the area where we have to move forward in > > Beam > > > >> as > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> it > > > >>>>>>>>> > > > >>>>>>>>>> will > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> allow new users to start in their projects. > > > >>>>>>>>>>>>>>>>>> So, we started to bring new IOs: Kafka, JMS, > > Cassandra, > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> MongoDB, > > > >>>>>>>>>> > > > >>>>>>>>>>> JDBC, > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> ... and some people started to learn the IO API > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> (Bounded/Unbouded > > > >>>>>>>>>> > > > >>>>>>>>>>> source, etc). > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> I think it would make more sense to enhance the IO > API > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> (Source) > > > >>>>>>>>> > > > >>>>>>>>>> instead > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> of introducing a NewDoFn. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> What are your thoughts for IO writer like me ? ;) > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Regards > > > >>>>>>>>>>>>>>>>>> JB > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote: > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Hello Beam community, > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> We (myself, Daniel Mills and Robert Bradshaw) would > > > like > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> to > > > >>>>>>>> > > > >>>>>>>>> propose > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> "Splittable DoFn" - a major generalization of DoFn, > > which > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> allows > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> processing > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> of a single element to be non-monolithic, i.e. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> checkpointable > > > >>>>>>>>> > > > >>>>>>>>>> and > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> parallelizable, as well as doing an unbounded amount of > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> work > > > >>>>>>>>> > > > >>>>>>>>>> per > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> element. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> This allows effectively replacing the current > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Bounded/UnboundedSource > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> APIs > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> with DoFn's that are much easier to code, more > > scalable > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> and > > > >>>>>>>> > > > >>>>>>>>> composable > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> with > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> the rest of the Beam programming model, and enables > > > many > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> use > > > >>>>>>>>> > > > >>>>>>>>>> cases > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> that > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> were previously difficult or impossible, as well as > > some > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> non-obvious new > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> use cases. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> This proposal has been mentioned before in JIRA > > > >> [BEAM-65] > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> and > > > >>>>>>>>> > > > >>>>>>>>>> some > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Beam > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> meetings, and now the whole thing is written up in a > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> document: > > > >>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> https://s.apache.org/splittable-do-fn > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Here are some things that become possible with > > > >> Splittable > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> DoFn: > > > >>>>>>>>>> > > > >>>>>>>>>>> - Efficiently read a filepattern matching millions of > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> files > > > >>>>>>>> > > > >>>>>>>>> - Read a collection of files that are produced by an > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> earlier > > > >>>>>>>>> > > > >>>>>>>>>> step > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> in the > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> pipeline (e.g. easily implement a connector to a > > storage > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> system > > > >>>>>>>>>> > > > >>>>>>>>>>> that can > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> export itself to files) > > > >>>>>>>>>>>>>>>>>>> - Implement a Kafka reader by composing a "list > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> partitions" > > > >>>>>>>> > > > >>>>>>>>> DoFn > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> with a > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> DoFn that simply polls a consumer and outputs new > > records > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> in > > > >>>>>>>>> > > > >>>>>>>>>> a > > > >>>>>>>>>> > > > >>>>>>>>>>> while() > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> loop > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> - Implement a log tailer by composing a DoFn that > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> incrementally > > > >>>>>>>>>> > > > >>>>>>>>>>> returns > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> new > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> files in a directory and a DoFn that tails a file > > > >>>>>>>>>>>>>>>>>>> - Implement a parallel "count friends in common" > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> algorithm > > > >>>>>>>> > > > >>>>>>>>> (matrix > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> squaring) with good work balancing > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Here is the meaningful part of a hypothetical Kafka > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> reader > > > >>>>>>>> > > > >>>>>>>>> written > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> against > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> this API: > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> ProcessContinuation processElement( > > > >>>>>>>>>>>>>>>>>>> ProcessContext context, > > OffsetRangeTracker > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> tracker) > > > >>>>>>>>>> > > > >>>>>>>>>>> { > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> try (KafkaConsumer<String, String> consumer = > > > >>>>>>>>>>>>>>>>>>> > > > Kafka.subscribe(context.element().topic, > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> context.element().partition)) { > > > >>>>>>>>>> > > > >>>>>>>>>>> consumer.seek(tracker.start()); > > > >>>>>>>>>>>>>>>>>>> while (true) { > > > >>>>>>>>>>>>>>>>>>> ConsumerRecords<String, String> records = > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> consumer.poll(100ms); > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> if (records == null) return done(); > > > >>>>>>>>>>>>>>>>>>> for (ConsumerRecord<String, String> > record > > : > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> records) > > > >>>>>>>>>> > > > >>>>>>>>>>> { > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> if (!tracker.tryClaim(record.offset())) { > > > >>>>>>>>>>>>>>>>>>> return > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > resume().withFutureOutputWatermark(record.timestamp()); > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> } > > > >>>>>>>>>>>>>>>>>>> context.output(record); > > > >>>>>>>>>>>>>>>>>>> } > > > >>>>>>>>>>>>>>>>>>> } > > > >>>>>>>>>>>>>>>>>>> } > > > >>>>>>>>>>>>>>>>>>> } > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> The document describes in detail the motivations > > behind > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> this > > > >>>>>>>>> > > > >>>>>>>>>> feature, > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> basic idea and API, open questions, and outlines an > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> incremental > > > >>>>>>>>>> > > > >>>>>>>>>>> delivery > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> plan. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> The proposed API builds on the reflection-based new > > > DoFn > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> [new-do-fn] > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> and is > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> loosely related to "State and Timers for DoFn" > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> [beam-state]. > > > >>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Please take a look and comment! > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Thanks. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> [BEAM-65] > > > https://issues.apache.org/jira/browse/BEAM-65 > > > >>>>>>>>>>>>>>>>>>> [new-do-fn] https://s.apache.org/a-new-do-fn > > > >>>>>>>>>>>>>>>>>>> [beam-state] https://s.apache.org/beam-state > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> -- > > > >>>>>>>>>>>>>>>>>> Jean-Baptiste Onofré > > > >>>>>>>>>>>>>>>>>> jbono...@apache.org <javascript:;> > > > >>>>>>>>>>>>>>>>>> http://blog.nanthrax.net > > > >>>>>>>>>>>>>>>>>> Talend - http://www.talend.com > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> -- > > > >>>>>>>>> Thanks, > > > >>>>>>>>> Andrew > > > >>>>>>>>> > > > >>>>>>>>> Subscribe to my book: Streaming Data < > > http://manning.com/psaltis > > > > > > > >>>>>>>>> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306> > > > >>>>>>>>> twiiter: @itmdata < > > > >>> http://twitter.com/intent/user?screen_name=itmdata > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> -- > > > >>>>>> Jean-Baptiste Onofré > > > >>>>>> jbono...@apache.org > > > >>>>>> http://blog.nanthrax.net > > > >>>>>> Talend - http://www.talend.com > > > >>>>>> > > > >>>>>> > > > >>>>> > > > >>>> -- > > > >>>> Jean-Baptiste Onofré > > > >>>> jbono...@apache.org > > > >>>> http://blog.nanthrax.net > > > >>>> Talend - http://www.talend.com > > > >>>> > > > >>> > > > >> > > > > > > > > > > -- > > > Jean-Baptiste Onofré > > > jbono...@apache.org > > > http://blog.nanthrax.net > > > Talend - http://www.talend.com > > > > > >