Re: Introducing a Redistribute transform

2016-10-10 Thread Eugene Kirpichov
Hi Amit,

The transform, the way it's implemented, actually does several things at
the same time and that's why it's tricky to document it.

Redistribute.arbitrarily():
- Introduces a fusion barrier (in runners that have it), making sure that
the runner can fully parallelize processing the output PCollection with
DoFn's
- Introduces a fault-tolerance barrier, effectively "checkpointing" the
input PCollection (again, in runners where it makes sense) and making sure
that processing elements of the output PCollection with a DoFn, if the DoFn
fails, will redo only that processing, but not need to recompute the input
PCollection.

Redistribute.byKey():
- All of the above and also makes the collection "key-partitioned", giving
access to per-key state to downstream key-preserving DoFns. However, this
is also runner-specific, because it's conceivable that a runner might not
need this "key-partitioned" property (in fact it's best if a runner
inserted such a "redistribute by key" automatically if it needs it...), and
it currently isn't exposed anyway.

Still thinking about the best way to describe this in a way that's least
confusing to users.

Regarding redistributing into N shards: this is problematic because it
doesn't seem to make sense in the unified model (in streaming in particular
- having N keys doesn't mean you have N bundles), and breaks down if you
add dynamic work rebalancing, backups and other magic. So I decided not to
bother with this in that PR.

Agreed with Robert that limiting the parallelism, or throttling, are very
useful features, but Redistribute is not the right place to introduce them.

On Mon, Oct 10, 2016 at 12:58 PM Amit Sela  wrote:

> On Mon, Oct 10, 2016 at 9:21 PM Robert Bradshaw
> 
> wrote:
>
> > On Sat, Oct 8, 2016 at 7:31 AM, Amit Sela  wrote:
> >
> > > Hi Eugene,
> >
> > >
> >
> > > This is very interesting.
> >
> > > Let me see if I get this right, the "Redistribute"  transformation
> > assigns
> >
> > > a "running id" key (per-bundle) , calls "Redistribute.byKey", and
> > extracts
> >
> > > back the values, correct ?
> >
> >
> >
> > The keys are (pseudorandomly) unique per element.
> >
> >
> >
> > > As for "Redistribute.byKey" - it's made of a GroupByKey transformation
> > that
> >
> > > follows a Window transformation that neutralises the "resolution" of
> >
> > > triggers and panes that usually occurs in GroupByKey, correct ?
> >
> > >
> >
> > > So this is basically a "FanOut" transformation which will depend on the
> >
> > > available resources of the runner (and the uniqueness of the assigned
> > keys)
> >
> > > ?
> >
> > >
> >
> > > Would we want to Redistribute into a user-defined number of bundles (>
> >
> > > current) ?
> >
> >
> >
> > I don't think there's any advantage to letting the user specify a
> >
> > number here; the data is spread out among as many machines as are
> >
> > handling the shuffling (for N elements, there are ~N unique keys,
> >
> > which gets partitioned by the system to the M workers).
> >
> >
> >
> > > How about "FanIn" ?
> >
> >
> >
> > Could you clarify what you would hope to use this for?
> >
> Well, what if for some reason I would want to limit parallelism for a step
> in the Pipeline ? like calling an external service without "DDoS"ing it ?
>
> >
> >
> >
> > > On Fri, Oct 7, 2016 at 10:49 PM Eugene Kirpichov
> >
> > >  wrote:
> >
> > >
> >
> > >> Hello,
> >
> > >>
> >
> > >> Heads up that https://github.com/apache/incubator-beam/pull/1036 will
> >
> > >> introduce a transform called "Redistribute", encapsulating a
> relatively
> >
> > >> common pattern - a "fusion break" [see
> >
> > >>
> >
> > >>
> >
> https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion
> >
> > >> previously
> >
> > >> providing advice on that] - useful e.g. when you write an IO as a
> > sequence
> >
> > >> of ParDo's: split a query into parts, read each part, and you want to
> >
> > >> prevent fusing these ParDo's because that would make the whole thing
> >
> > >> execute sequentially, and in other similar cases.
> >
> > >>
> >
> > >> The PR also uses it, as an example, in DatastoreIO and JdbcIO, both of
> >
> > >> which used to have a hand-rolled implementation of the same. The Write
> >
> > >> transform has something similar, but not quite identical, so I skipped
> > it.
> >
> > >>
> >
> > >> This is not a model change - merely providing a common implementation
> of
> >
> > >> something useful that already existed but was scattered across the
> >
> > >> codebase.
> >
> > >>
> >
> > >> Redistribute also subsumes the old mostly-internal Reshuffle transform
> > via
> >
> > >> Redistribute.byKey().
> >
> > >>
> >
> > >> I tried finding more cases in the Beam codebase that have an ad-hoc
> >
> > >> implementation of this; I did not find any, but I might have missed
> >
> > >> something. I suppose the transform will need to be advertised in
> >
> > >> documentation on best-practices for connector development; perhaps
> some
> >
> > >> StackOverflow

Re: Introducing a Redistribute transform

2016-10-10 Thread Robert Bradshaw
On Mon, Oct 10, 2016 at 12:57 PM, Amit Sela  wrote:

>> > So this is basically a "FanOut" transformation which will depend on the
>>
>> > available resources of the runner (and the uniqueness of the assigned
>> keys)
>>
>> > ?
>>
>> >
>>
>> > Would we want to Redistribute into a user-defined number of bundles (>
>>
>> > current) ?
>>
>>
>>
>> I don't think there's any advantage to letting the user specify a
>>
>> number here; the data is spread out among as many machines as are
>>
>> handling the shuffling (for N elements, there are ~N unique keys,
>>
>> which gets partitioned by the system to the M workers).
>>
>>
>>
>> > How about "FanIn" ?
>>
>>
>>
>> Could you clarify what you would hope to use this for?
>>
> Well, what if for some reason I would want to limit parallelism for a step
> in the Pipeline ? like calling an external service without "DDoS"ing it ?

I think this is something is more difficult to enforce without
runner-specific support. For example, if one writes

input.apply(Redistribute(N)).apply(ParDo(...))

one is assuming that fusion takes place such that the subsequent ParDo
doesn't happen to get processed by more-than-expected shards. It's
also much simpler to spread the elements out among 2^64 keys than
spread them out to a small N keys, and choosing exactly N keys isn't
necessarily the best way to enforce parallelism constraints (as this
would likely introduce stragglers). One typically wants to reduce
parallelism over a portion (interval?) of a pipeline, whereas
redistribution operates at a point in your pipeline.

I agree that being able to limit parallelism (possibly dynamically
based on pushback from an external service, or noting that throughput
is no longer scaling linearly) would be a useful feature to have, but
that's a bit out of scope here.


Re: Introducing a Redistribute transform

2016-10-10 Thread Amit Sela
On Mon, Oct 10, 2016 at 9:21 PM Robert Bradshaw 
wrote:

> On Sat, Oct 8, 2016 at 7:31 AM, Amit Sela  wrote:
>
> > Hi Eugene,
>
> >
>
> > This is very interesting.
>
> > Let me see if I get this right, the "Redistribute"  transformation
> assigns
>
> > a "running id" key (per-bundle) , calls "Redistribute.byKey", and
> extracts
>
> > back the values, correct ?
>
>
>
> The keys are (pseudorandomly) unique per element.
>
>
>
> > As for "Redistribute.byKey" - it's made of a GroupByKey transformation
> that
>
> > follows a Window transformation that neutralises the "resolution" of
>
> > triggers and panes that usually occurs in GroupByKey, correct ?
>
> >
>
> > So this is basically a "FanOut" transformation which will depend on the
>
> > available resources of the runner (and the uniqueness of the assigned
> keys)
>
> > ?
>
> >
>
> > Would we want to Redistribute into a user-defined number of bundles (>
>
> > current) ?
>
>
>
> I don't think there's any advantage to letting the user specify a
>
> number here; the data is spread out among as many machines as are
>
> handling the shuffling (for N elements, there are ~N unique keys,
>
> which gets partitioned by the system to the M workers).
>
>
>
> > How about "FanIn" ?
>
>
>
> Could you clarify what you would hope to use this for?
>
Well, what if for some reason I would want to limit parallelism for a step
in the Pipeline ? like calling an external service without "DDoS"ing it ?

>
>
>
> > On Fri, Oct 7, 2016 at 10:49 PM Eugene Kirpichov
>
> >  wrote:
>
> >
>
> >> Hello,
>
> >>
>
> >> Heads up that https://github.com/apache/incubator-beam/pull/1036 will
>
> >> introduce a transform called "Redistribute", encapsulating a relatively
>
> >> common pattern - a "fusion break" [see
>
> >>
>
> >>
> https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion
>
> >> previously
>
> >> providing advice on that] - useful e.g. when you write an IO as a
> sequence
>
> >> of ParDo's: split a query into parts, read each part, and you want to
>
> >> prevent fusing these ParDo's because that would make the whole thing
>
> >> execute sequentially, and in other similar cases.
>
> >>
>
> >> The PR also uses it, as an example, in DatastoreIO and JdbcIO, both of
>
> >> which used to have a hand-rolled implementation of the same. The Write
>
> >> transform has something similar, but not quite identical, so I skipped
> it.
>
> >>
>
> >> This is not a model change - merely providing a common implementation of
>
> >> something useful that already existed but was scattered across the
>
> >> codebase.
>
> >>
>
> >> Redistribute also subsumes the old mostly-internal Reshuffle transform
> via
>
> >> Redistribute.byKey().
>
> >>
>
> >> I tried finding more cases in the Beam codebase that have an ad-hoc
>
> >> implementation of this; I did not find any, but I might have missed
>
> >> something. I suppose the transform will need to be advertised in
>
> >> documentation on best-practices for connector development; perhaps some
>
> >> StackOverflow answers should be updated; any other places?
>
> >>
>
>


Re: Jenkins build is still unstable: beam_PostCommit_MavenVerify » Apache Beam :: Examples :: Java #1489

2016-10-10 Thread Pei He
Looking at the broken tests.

On Mon, Oct 10, 2016 at 10:05 AM, Apache Jenkins Server <
jenk...@builds.apache.org> wrote:

> See  MavenVerify/org.apache.beam$beam-examples-java/1489/>
>
>


Re: Introducing a Redistribute transform

2016-10-10 Thread Robert Bradshaw
On Sat, Oct 8, 2016 at 7:31 AM, Amit Sela  wrote:
> Hi Eugene,
>
> This is very interesting.
> Let me see if I get this right, the "Redistribute"  transformation assigns
> a "running id" key (per-bundle) , calls "Redistribute.byKey", and extracts
> back the values, correct ?

The keys are (pseudorandomly) unique per element.

> As for "Redistribute.byKey" - it's made of a GroupByKey transformation that
> follows a Window transformation that neutralises the "resolution" of
> triggers and panes that usually occurs in GroupByKey, correct ?
>
> So this is basically a "FanOut" transformation which will depend on the
> available resources of the runner (and the uniqueness of the assigned keys)
> ?
>
> Would we want to Redistribute into a user-defined number of bundles (>
> current) ?

I don't think there's any advantage to letting the user specify a
number here; the data is spread out among as many machines as are
handling the shuffling (for N elements, there are ~N unique keys,
which gets partitioned by the system to the M workers).

> How about "FanIn" ?

Could you clarify what you would hope to use this for?

> On Fri, Oct 7, 2016 at 10:49 PM Eugene Kirpichov
>  wrote:
>
>> Hello,
>>
>> Heads up that https://github.com/apache/incubator-beam/pull/1036 will
>> introduce a transform called "Redistribute", encapsulating a relatively
>> common pattern - a "fusion break" [see
>>
>> https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion
>> previously
>> providing advice on that] - useful e.g. when you write an IO as a sequence
>> of ParDo's: split a query into parts, read each part, and you want to
>> prevent fusing these ParDo's because that would make the whole thing
>> execute sequentially, and in other similar cases.
>>
>> The PR also uses it, as an example, in DatastoreIO and JdbcIO, both of
>> which used to have a hand-rolled implementation of the same. The Write
>> transform has something similar, but not quite identical, so I skipped it.
>>
>> This is not a model change - merely providing a common implementation of
>> something useful that already existed but was scattered across the
>> codebase.
>>
>> Redistribute also subsumes the old mostly-internal Reshuffle transform via
>> Redistribute.byKey().
>>
>> I tried finding more cases in the Beam codebase that have an ad-hoc
>> implementation of this; I did not find any, but I might have missed
>> something. I suppose the transform will need to be advertised in
>> documentation on best-practices for connector development; perhaps some
>> StackOverflow answers should be updated; any other places?
>>


Re: [DISCUSS] UnboundedSource and the KafkaIO.

2016-10-10 Thread Amit Sela
That's a good question Robert, and I did.

First of all, an UnboundedSource is split into splits that implement a sort
of "BoundedReadFromUnboundedSource", with Restrictions on time and
(optional) number of records - this seems to fit nicely into the *SDF*
 language.

Taking a look at the diagram in the Spark runner's UnboundedSource design
doc, and the diagram placed just above "Restrictions, blocks and positions
"
there seems to be a lot of resemblance.

Finally, the idea of reading from within a DoFn is exactly what I'm doing
here, reading from within the mapWithState's "mappingFunction" - a function
that maps:
 , State> *=>* Iterator
Mapping a Source (and it's State and possibly restrictions) into a bunch of
read records.

This all *seems* to be a good fit, but I'll probably have to keep following
closely to see how the API is forming.

Thanks,
Amit

On Mon, Oct 10, 2016 at 8:23 PM Robert Bradshaw 
wrote:

> Just looking to the future, have you given any thought on how well
> this would work on https://s.apache.org/splittable-do-fn?
>
> On Mon, Oct 10, 2016 at 6:35 AM, Amit Sela  wrote:
> > Thanks Max!
> >
> > I'll try to explain Spark's stateful operators and how/why I used them
> with
> > UnboundedSource.
> >
> > Spark has two stateful operators: *updateStateByKey* and *mapWithState*.
> > Since updateStateByKey is bound to output the (updated) state itself -
> the
> > CheckpointMark in our case - we're left with mapWithState.
> > mapWithState provides a persistent, distributed "map-like", that is
> > partitioned according to the stream. This is indeed how I manage state
> > between micro-batches.
> > However, mapWithState (like any map) will give you a value (state)
> > corresponding to a specific key, so I use a running-id from the initial
> > splitting to identify the appropriate state.
> > I took a look at Flink's implementation ( I do that sometimes ;-) ) and I
> > could do the same and save the split source with the CheckpointMark but
> > it'll still have to correspond to the same id, and since I had to wrap
> the
> > split Source to perform a sort of "BoundedReadFromUnboundedSource" I
> simply
> > added an id field and I'm hashing by that id.
> > I'll also add that the stateful operator can only be applied to a
> > (Pair)Stream and not to input operators so I'm actually generating a
> stream
> > of splits (the same ones for every micro-batch) and reading from within
> the
> > mappingFunction of the mapWithState.
> >
> > It's not the simplest design, but given how Spark's persistent state and
> > InputDStream are designed comparing to the Beam model, I don't see
> another
> > way - though I'd be happy to hear one!
> >
> > Pretty sure I've added this here but no harm in adding the link again:
> design
> > doc
> > <
> https://docs.google.com/document/d/12BzHbETDt7ICIF7vc8zzCeLllmIpvvaVDIdBlcIwE1M/edit?usp=sharing
> >
> > and
> > a work-in-progress branch
> >  all
> > mentioned in the ticket 
> as
> > well.
> > The design doc also relates to how "pure" Spark works with Kafka, which I
> > think is interesting and very different from Flink/Dataflow.
> >
> > Hope this helped clear things up a little, please keep on asking if
> > something is not clear yet.
> >
> > Thanks,
> > Amit.
> >
> > On Mon, Oct 10, 2016 at 4:02 PM Maximilian Michels 
> wrote:
> >
> >> Just to add a comment from the Flink side and its
> >>
> >> UnboundedSourceWrapper. We experienced the only way to guarantee
> >>
> >> deterministic splitting of the source, was to generate the splits upon
> >>
> >> creation of the source and then checkpoint the assignment during
> >>
> >> runtime. When restoring from a checkpoint, the same reader
> >>
> >> configuration is restored. It's not possible to change the splitting
> >>
> >> after the initial splitting has taken place. However, Flink will soon
> >>
> >> be able to repartition the operator state upon restart/rescaling of a
> >>
> >> job.
> >>
> >>
> >>
> >> Does Spark have a way to pass state of a previous mini batch to the
> >>
> >> current mini batch? If so, you could restore the last configuration
> >>
> >> and continue reading from the checkpointed offset. You just have to
> >>
> >> checkpoint before the mini batch ends.
> >>
> >>
> >>
> >> -Max
> >>
> >>
> >>
> >> On Mon, Oct 10, 2016 at 10:38 AM, Jean-Baptiste Onofré  >
> >> wrote:
> >>
> >> > Hi Amit,
> >>
> >> >
> >>
> >> > thanks for the explanation.
> >>
> >> >
> >>
> >> > For 4, you are right, it's slightly different from DataXchange
> (related
> >> to
> >>
> >> > the elements in the PCollection). I think storing the "starting point"
> >> for a
> >>
> >> > reader makes sense.
> >>
> >> >
> >>
> >> > Regards
> >>
> >> > JB
> >>
> >> >
> >>
> >> >
> >>
> >> > On 10/10/2016 10:33 AM, Amit Sela wrote:
> >>
> >> >>
> >

Re: [PROPOSAL] Introduce review mailing list and provide update on open discussion

2016-10-10 Thread Jean-Baptiste Onofré

Thanks for the update Frances.

I will ping my infra contact to move forward quickly.

Regards
JB

On 10/10/2016 07:27 PM, Frances Perry wrote:

Related to #3-5: Also, as we discussed earlier [1], there will be an
additional level of tracking in jira for deeper proposal-style
conversations to help us keep track of which ones are still under
discussion on the dev@ list (which, as usual, remains the source of truth).

The details are still in a pull request [2], which is blocked on a creation
of the new JIRA workflow [3].

[1]
https://lists.apache.org/thread.html/f030424d9f9de6ff7510ff444280a64bdaa0ac430066e96df0c43fe8@%3Cdev.beam.apache.org%3E
[2] https://github.com/apache/incubator-beam-site/pull/42
[3] https://issues.apache.org/jira/browse/INFRA-12698

On Sun, Oct 9, 2016 at 10:55 PM, Jean-Baptiste Onofré 
wrote:


Hi Max and the others,

For 5, it was more the idea to have a agreement on a proposal. 2 weeks
without any feedback (it's not two weeks "static") is just an idea. The
discussion can be extended for as long as we want if there are still some
discussions.

Agree on 4, it's just a best effort. The idea is to provide kind of
summary sometime in order to hide some details and technical complexity
that don't always bring value from a community perspective.

I will send an update later today about what we discussed.

Thanks
Regards
JB


On 10/07/2016 10:58 AM, Maximilian Michels wrote:


Hi JB!

1. We create a new mailing list: rev...@beam.incubator.apache.org.

2. We configure github integration to send all pull request comments on
review mailing list. It would allow to track and simplify the way to read
the comments and to keep up to date.



I already have it organized that way through filters but having a
dedicated mailing list is a much better idea.

3. A technical discussion should be send on dev mailing list with the

[DISCUSS] keyword in the subject.
4. Once a discussion is open, the author should periodically send an
update on the discussion (once a week) >containing a summary of the last
exchanges happened on the Jira or github (quick and direct summary).



We can try that on a best-effort basis. Enforcing this seems to be
difficult and could also introduce verbosity on the mailing list.

5. Once we consider the discussion close (no update in the last two

weeks), the author send a [CLOSE] e-mail on the thread.



I think it is hard to decide when a discussion is closed. Two weeks
seems like a too short amount of time.

In general, +1 for an open development process.

-Max

On Fri, Oct 7, 2016 at 4:05 AM, Jungtaek Lim  wrote:


+1 except [4] for me, too. [4] may be replaced with linking DISCUSSION
mail
thread archive to JIRA.
Yes it doesn't update news on discussion to JIRA and/or Github, but at
least someone needed to see can find out manually.

Thanks,
Jungtaek Lim (HeartSaVioR)

2016년 10월 7일 (금) 오전 11:00, Satish Duggana 님이
작성:

+1 for proposal except for [4]. Agree with Raghu on [4] as it may be

burdensome to update with summaries and folks may start replying
comments
on those summaries etc and conclusions are updated on respective design
docs. We may want to start without [4].

Thanks,
Satish.

On Fri, Oct 7, 2016 at 12:00 AM, Raghu Angadi

wrote:

+1 for rev...@beam.incubator.apache.org. Open lists are critically

important.

My comment earlier was mainly about (4). Sorry about the not being
clear.

On Thu, Oct 6, 2016 at 11:00 AM, Lukasz Cwik 


wrote:

+1 for supporting different working styles.


On Thu, Oct 6, 2016 at 10:58 AM, Kenneth Knowles







wrote:

+1 to rev...@beam.incubator.apache.org if it is turnkey for infra to



set



up, aka points 1 and 2.


Even though I would not personally read it via email, getting the
information in yet another format and infrastructure (and


stewardship)



is


valuable for search, archival, and supporting diverse work styles.



The



benefit might not be huge, but I think it will be enough to justify



the



(hopefully negligible) cost.


Kenn

On Thu, Oct 6, 2016 at 4:54 AM Jean-Baptiste Onofré 


wrote:


Hi team,

following the discussion we had about technical discussion that


should



happen on the mailing list, I would like to propose the following:


1. We create a new mailing list: rev...@beam.incubator.apache.org.
2. We configure github integration to send all pull request comments


on



review mailing list. It would allow to track and simplify the way to

read the comments and to keep up to date.
3. A technical discussion should be send on dev mailing list with the
[DISCUSS] keyword in the subject.
4. Once a discussion is open, the author should periodically send an
update on the discussion (once a week) containing a summary of the


last



exchanges happened on the Jira or github (quick and direct summary).

5. Once we consider the discussion close (no update in the last two
weeks), the author send a [CLOSE] e-mail on the thread.

WDYT ?

Regards
JB
--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.n

Re: [PROPOSAL] Introduce review mailing list and provide update on open discussion

2016-10-10 Thread Frances Perry
Related to #3-5: Also, as we discussed earlier [1], there will be an
additional level of tracking in jira for deeper proposal-style
conversations to help us keep track of which ones are still under
discussion on the dev@ list (which, as usual, remains the source of truth).

The details are still in a pull request [2], which is blocked on a creation
of the new JIRA workflow [3].

[1]
https://lists.apache.org/thread.html/f030424d9f9de6ff7510ff444280a64bdaa0ac430066e96df0c43fe8@%3Cdev.beam.apache.org%3E
[2] https://github.com/apache/incubator-beam-site/pull/42
[3] https://issues.apache.org/jira/browse/INFRA-12698

On Sun, Oct 9, 2016 at 10:55 PM, Jean-Baptiste Onofré 
wrote:

> Hi Max and the others,
>
> For 5, it was more the idea to have a agreement on a proposal. 2 weeks
> without any feedback (it's not two weeks "static") is just an idea. The
> discussion can be extended for as long as we want if there are still some
> discussions.
>
> Agree on 4, it's just a best effort. The idea is to provide kind of
> summary sometime in order to hide some details and technical complexity
> that don't always bring value from a community perspective.
>
> I will send an update later today about what we discussed.
>
> Thanks
> Regards
> JB
>
>
> On 10/07/2016 10:58 AM, Maximilian Michels wrote:
>
>> Hi JB!
>>
>> 1. We create a new mailing list: rev...@beam.incubator.apache.org.
>>> 2. We configure github integration to send all pull request comments on
>>> review mailing list. It would allow to track and simplify the way to read
>>> the comments and to keep up to date.
>>>
>>
>> I already have it organized that way through filters but having a
>> dedicated mailing list is a much better idea.
>>
>> 3. A technical discussion should be send on dev mailing list with the
>>> [DISCUSS] keyword in the subject.
>>> 4. Once a discussion is open, the author should periodically send an
>>> update on the discussion (once a week) >containing a summary of the last
>>> exchanges happened on the Jira or github (quick and direct summary).
>>>
>>
>> We can try that on a best-effort basis. Enforcing this seems to be
>> difficult and could also introduce verbosity on the mailing list.
>>
>> 5. Once we consider the discussion close (no update in the last two
>>> weeks), the author send a [CLOSE] e-mail on the thread.
>>>
>>
>> I think it is hard to decide when a discussion is closed. Two weeks
>> seems like a too short amount of time.
>>
>> In general, +1 for an open development process.
>>
>> -Max
>>
>> On Fri, Oct 7, 2016 at 4:05 AM, Jungtaek Lim  wrote:
>>
>>> +1 except [4] for me, too. [4] may be replaced with linking DISCUSSION
>>> mail
>>> thread archive to JIRA.
>>> Yes it doesn't update news on discussion to JIRA and/or Github, but at
>>> least someone needed to see can find out manually.
>>>
>>> Thanks,
>>> Jungtaek Lim (HeartSaVioR)
>>>
>>> 2016년 10월 7일 (금) 오전 11:00, Satish Duggana 님이
>>> 작성:
>>>
>>> +1 for proposal except for [4]. Agree with Raghu on [4] as it may be
 burdensome to update with summaries and folks may start replying
 comments
 on those summaries etc and conclusions are updated on respective design
 docs. We may want to start without [4].

 Thanks,
 Satish.

 On Fri, Oct 7, 2016 at 12:00 AM, Raghu Angadi
 
 wrote:

 +1 for rev...@beam.incubator.apache.org. Open lists are critically
> important.
>
> My comment earlier was mainly about (4). Sorry about the not being
> clear.
>
> On Thu, Oct 6, 2016 at 11:00 AM, Lukasz Cwik  >
> wrote:
>
> +1 for supporting different working styles.
>>
>> On Thu, Oct 6, 2016 at 10:58 AM, Kenneth Knowles
>>
> >>>
>
>> wrote:
>>
>> +1 to rev...@beam.incubator.apache.org if it is turnkey for infra to
>>>
>> set
>
>> up, aka points 1 and 2.
>>>
>>> Even though I would not personally read it via email, getting the
>>> information in yet another format and infrastructure (and
>>>
>> stewardship)

> is
>
>> valuable for search, archival, and supporting diverse work styles.
>>>
>> The

> benefit might not be huge, but I think it will be enough to justify
>>>
>> the

> (hopefully negligible) cost.
>>>
>>> Kenn
>>>
>>> On Thu, Oct 6, 2016 at 4:54 AM Jean-Baptiste Onofré >>
>>
> wrote:
>>>
>>> Hi team,
>>>
>>> following the discussion we had about technical discussion that
>>>
>> should

> happen on the mailing list, I would like to propose the following:
>>>
>>> 1. We create a new mailing list: rev...@beam.incubator.apache.org.
>>> 2. We configure github integration to send all pull request comments
>>>
>> on

> review mailing list. It would allow to track and simplify the way to
>>> read the comments and to keep up to date.
>>> 3. A technical discussion should be send on dev mailing list with the

Re: [DISCUSS] UnboundedSource and the KafkaIO.

2016-10-10 Thread Robert Bradshaw
Just looking to the future, have you given any thought on how well
this would work on https://s.apache.org/splittable-do-fn?

On Mon, Oct 10, 2016 at 6:35 AM, Amit Sela  wrote:
> Thanks Max!
>
> I'll try to explain Spark's stateful operators and how/why I used them with
> UnboundedSource.
>
> Spark has two stateful operators: *updateStateByKey* and *mapWithState*.
> Since updateStateByKey is bound to output the (updated) state itself - the
> CheckpointMark in our case - we're left with mapWithState.
> mapWithState provides a persistent, distributed "map-like", that is
> partitioned according to the stream. This is indeed how I manage state
> between micro-batches.
> However, mapWithState (like any map) will give you a value (state)
> corresponding to a specific key, so I use a running-id from the initial
> splitting to identify the appropriate state.
> I took a look at Flink's implementation ( I do that sometimes ;-) ) and I
> could do the same and save the split source with the CheckpointMark but
> it'll still have to correspond to the same id, and since I had to wrap the
> split Source to perform a sort of "BoundedReadFromUnboundedSource" I simply
> added an id field and I'm hashing by that id.
> I'll also add that the stateful operator can only be applied to a
> (Pair)Stream and not to input operators so I'm actually generating a stream
> of splits (the same ones for every micro-batch) and reading from within the
> mappingFunction of the mapWithState.
>
> It's not the simplest design, but given how Spark's persistent state and
> InputDStream are designed comparing to the Beam model, I don't see another
> way - though I'd be happy to hear one!
>
> Pretty sure I've added this here but no harm in adding the link again: design
> doc
> 
> and
> a work-in-progress branch
>  all
> mentioned in the ticket  as
> well.
> The design doc also relates to how "pure" Spark works with Kafka, which I
> think is interesting and very different from Flink/Dataflow.
>
> Hope this helped clear things up a little, please keep on asking if
> something is not clear yet.
>
> Thanks,
> Amit.
>
> On Mon, Oct 10, 2016 at 4:02 PM Maximilian Michels  wrote:
>
>> Just to add a comment from the Flink side and its
>>
>> UnboundedSourceWrapper. We experienced the only way to guarantee
>>
>> deterministic splitting of the source, was to generate the splits upon
>>
>> creation of the source and then checkpoint the assignment during
>>
>> runtime. When restoring from a checkpoint, the same reader
>>
>> configuration is restored. It's not possible to change the splitting
>>
>> after the initial splitting has taken place. However, Flink will soon
>>
>> be able to repartition the operator state upon restart/rescaling of a
>>
>> job.
>>
>>
>>
>> Does Spark have a way to pass state of a previous mini batch to the
>>
>> current mini batch? If so, you could restore the last configuration
>>
>> and continue reading from the checkpointed offset. You just have to
>>
>> checkpoint before the mini batch ends.
>>
>>
>>
>> -Max
>>
>>
>>
>> On Mon, Oct 10, 2016 at 10:38 AM, Jean-Baptiste Onofré 
>> wrote:
>>
>> > Hi Amit,
>>
>> >
>>
>> > thanks for the explanation.
>>
>> >
>>
>> > For 4, you are right, it's slightly different from DataXchange (related
>> to
>>
>> > the elements in the PCollection). I think storing the "starting point"
>> for a
>>
>> > reader makes sense.
>>
>> >
>>
>> > Regards
>>
>> > JB
>>
>> >
>>
>> >
>>
>> > On 10/10/2016 10:33 AM, Amit Sela wrote:
>>
>> >>
>>
>> >> Inline, thanks JB!
>>
>> >>
>>
>> >> On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofré 
>>
>> >> wrote:
>>
>> >>
>>
>> >>> Hi Amit,
>>
>> >>>
>>
>> >>>
>>
>> >>>
>>
>> >>> For 1., the runner is responsible of the checkpoint storage (associated
>>
>> >>>
>>
>> >>> with the source). It's the way for the runner to retry and know the
>>
>> >>>
>>
>> >>> failed bundles.
>>
>> >>>
>>
>> >> True, this was a recap/summary of another, not-so-clear, thread.
>>
>> >>
>>
>> >>>
>>
>> >>>
>>
>> >>>
>>
>> >>> For 4, are you proposing that KafkaRecord store additional metadata for
>>
>> >>>
>>
>> >>> that ? It sounds like what I proposed in the "Technical Vision"
>> appendix
>>
>> >>>
>>
>> >>> document: there I proposed to introduce a DataXchange object that store
>>
>> >>>
>>
>> >>> some additional metadata (like offset) used by the runner. It would be
>>
>> >>>
>>
>> >>> the same with SDF as the tracker state should be persistent as well.
>>
>> >>>
>>
>> >> I think I was more focused on persisting the "starting point" for a
>>
>> >> reader,
>>
>> >> even if no records were read (yet), so that the next time the reader
>>
>> >> attempts to read it will pick of there. This has more to do with how the
>>
>> >> CheckpointMark handles this.
>>
>> >> I have to say that I'm not familia

Jenkins build is still unstable: beam_Release_NightlySnapshot #195

2016-10-10 Thread Apache Jenkins Server
See 



Re: [DISCUSS] UnboundedSource and the KafkaIO.

2016-10-10 Thread Amit Sela
Thanks Max!

I'll try to explain Spark's stateful operators and how/why I used them with
UnboundedSource.

Spark has two stateful operators: *updateStateByKey* and *mapWithState*.
Since updateStateByKey is bound to output the (updated) state itself - the
CheckpointMark in our case - we're left with mapWithState.
mapWithState provides a persistent, distributed "map-like", that is
partitioned according to the stream. This is indeed how I manage state
between micro-batches.
However, mapWithState (like any map) will give you a value (state)
corresponding to a specific key, so I use a running-id from the initial
splitting to identify the appropriate state.
I took a look at Flink's implementation ( I do that sometimes ;-) ) and I
could do the same and save the split source with the CheckpointMark but
it'll still have to correspond to the same id, and since I had to wrap the
split Source to perform a sort of "BoundedReadFromUnboundedSource" I simply
added an id field and I'm hashing by that id.
I'll also add that the stateful operator can only be applied to a
(Pair)Stream and not to input operators so I'm actually generating a stream
of splits (the same ones for every micro-batch) and reading from within the
mappingFunction of the mapWithState.

It's not the simplest design, but given how Spark's persistent state and
InputDStream are designed comparing to the Beam model, I don't see another
way - though I'd be happy to hear one!

Pretty sure I've added this here but no harm in adding the link again: design
doc

and
a work-in-progress branch
 all
mentioned in the ticket  as
well.
The design doc also relates to how "pure" Spark works with Kafka, which I
think is interesting and very different from Flink/Dataflow.

Hope this helped clear things up a little, please keep on asking if
something is not clear yet.

Thanks,
Amit.

On Mon, Oct 10, 2016 at 4:02 PM Maximilian Michels  wrote:

> Just to add a comment from the Flink side and its
>
> UnboundedSourceWrapper. We experienced the only way to guarantee
>
> deterministic splitting of the source, was to generate the splits upon
>
> creation of the source and then checkpoint the assignment during
>
> runtime. When restoring from a checkpoint, the same reader
>
> configuration is restored. It's not possible to change the splitting
>
> after the initial splitting has taken place. However, Flink will soon
>
> be able to repartition the operator state upon restart/rescaling of a
>
> job.
>
>
>
> Does Spark have a way to pass state of a previous mini batch to the
>
> current mini batch? If so, you could restore the last configuration
>
> and continue reading from the checkpointed offset. You just have to
>
> checkpoint before the mini batch ends.
>
>
>
> -Max
>
>
>
> On Mon, Oct 10, 2016 at 10:38 AM, Jean-Baptiste Onofré 
> wrote:
>
> > Hi Amit,
>
> >
>
> > thanks for the explanation.
>
> >
>
> > For 4, you are right, it's slightly different from DataXchange (related
> to
>
> > the elements in the PCollection). I think storing the "starting point"
> for a
>
> > reader makes sense.
>
> >
>
> > Regards
>
> > JB
>
> >
>
> >
>
> > On 10/10/2016 10:33 AM, Amit Sela wrote:
>
> >>
>
> >> Inline, thanks JB!
>
> >>
>
> >> On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofré 
>
> >> wrote:
>
> >>
>
> >>> Hi Amit,
>
> >>>
>
> >>>
>
> >>>
>
> >>> For 1., the runner is responsible of the checkpoint storage (associated
>
> >>>
>
> >>> with the source). It's the way for the runner to retry and know the
>
> >>>
>
> >>> failed bundles.
>
> >>>
>
> >> True, this was a recap/summary of another, not-so-clear, thread.
>
> >>
>
> >>>
>
> >>>
>
> >>>
>
> >>> For 4, are you proposing that KafkaRecord store additional metadata for
>
> >>>
>
> >>> that ? It sounds like what I proposed in the "Technical Vision"
> appendix
>
> >>>
>
> >>> document: there I proposed to introduce a DataXchange object that store
>
> >>>
>
> >>> some additional metadata (like offset) used by the runner. It would be
>
> >>>
>
> >>> the same with SDF as the tracker state should be persistent as well.
>
> >>>
>
> >> I think I was more focused on persisting the "starting point" for a
>
> >> reader,
>
> >> even if no records were read (yet), so that the next time the reader
>
> >> attempts to read it will pick of there. This has more to do with how the
>
> >> CheckpointMark handles this.
>
> >> I have to say that I'm not familiar with your DataXchange proposal, I
> will
>
> >> take a look though.
>
> >>
>
> >>>
>
> >>>
>
> >>>
>
> >>> Regards
>
> >>>
>
> >>> JB
>
> >>>
>
> >>>
>
> >>>
>
> >>> On 10/08/2016 01:55 AM, Amit Sela wrote:
>
> >>>
>
>  I started a thread about (suggesting) UnboundedSource splitId's and it
>
> >>>
>
> >>>
>
>  turned into an UnboundedSource/KafkaIO discussion, and I think it's
> best
>
> >

Re: [DISCUSS] UnboundedSource and the KafkaIO.

2016-10-10 Thread Jean-Baptiste Onofré

Hi Max,

thanks for the explanation and it makes lot of sense.

Not sure it will be so simple to store a previous state from one 
micro-batch to another. Let me take a look with Amit.


Regards
JB

On 10/10/2016 03:02 PM, Maximilian Michels wrote:

Just to add a comment from the Flink side and its
UnboundedSourceWrapper. We experienced the only way to guarantee
deterministic splitting of the source, was to generate the splits upon
creation of the source and then checkpoint the assignment during
runtime. When restoring from a checkpoint, the same reader
configuration is restored. It's not possible to change the splitting
after the initial splitting has taken place. However, Flink will soon
be able to repartition the operator state upon restart/rescaling of a
job.

Does Spark have a way to pass state of a previous mini batch to the
current mini batch? If so, you could restore the last configuration
and continue reading from the checkpointed offset. You just have to
checkpoint before the mini batch ends.

-Max

On Mon, Oct 10, 2016 at 10:38 AM, Jean-Baptiste Onofré  
wrote:

Hi Amit,

thanks for the explanation.

For 4, you are right, it's slightly different from DataXchange (related to
the elements in the PCollection). I think storing the "starting point" for a
reader makes sense.

Regards
JB


On 10/10/2016 10:33 AM, Amit Sela wrote:


Inline, thanks JB!

On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofré 
wrote:


Hi Amit,



For 1., the runner is responsible of the checkpoint storage (associated

with the source). It's the way for the runner to retry and know the

failed bundles.


True, this was a recap/summary of another, not-so-clear, thread.





For 4, are you proposing that KafkaRecord store additional metadata for

that ? It sounds like what I proposed in the "Technical Vision" appendix

document: there I proposed to introduce a DataXchange object that store

some additional metadata (like offset) used by the runner. It would be

the same with SDF as the tracker state should be persistent as well.


I think I was more focused on persisting the "starting point" for a
reader,
even if no records were read (yet), so that the next time the reader
attempts to read it will pick of there. This has more to do with how the
CheckpointMark handles this.
I have to say that I'm not familiar with your DataXchange proposal, I will
take a look though.





Regards

JB



On 10/08/2016 01:55 AM, Amit Sela wrote:


I started a thread about (suggesting) UnboundedSource splitId's and it




turned into an UnboundedSource/KafkaIO discussion, and I think it's best


to


start over in a clear [DISCUSS] thread.








When working on UnboundedSource support for the Spark runner, I've
raised




some questions, some were general-UnboundedSource, and others




Kafka-specific.








I'd like to recap them here, and maybe have a more productive and




well-documented discussion for everyone.








   1. UnboundedSource id's - I assume any runner persists the




   UnboundedSources's CheckpointMark for fault-tolerance, but I wonder


how it


   matches the appropriate split (of the UnboundedSource) to it's


previously


   persisted CheckpointMark in any specific worker ?




   *Thomas Groh* mentioned that Source splits have to have an




associated identifier,




   and so the runner gets to tag splits however it pleases, so long as




   those tags don't allow splits to bleed into each other.




   2. Consistent splitting - an UnboundedSource splitting seems to


require


   consistent splitting if it were to "pick-up where it left", correct ?


this


   is not mentioned as a requirement or a recommendation in




   UnboundedSource#generateInitialSplits(), so is this a Kafka-only


issue ?


   *Raghu Angadi* mentioned that Kafka already does so by applying




   partitions to readers in a round-robin manner.




   *Thomas Groh* also added that while the UnboundedSource API doesn't




   require deterministic splitting (although it's recommended), a




   PipelineRunner




   should keep track of the initially generated splits.




   3. Support reading of Kafka partitions that were added to topic/s


while


   a Pipeline reads from them - BEAM-727




    was filed.




   4. Reading/persisting Kafka start offsets - since Spark works in




   micro-batches, if "latest" was applied on a fairly sparse topic each


worker


   would actually begin reading only after it saw a message during the


time


   window it had to read messages. This is because fetching the offsets


is


   done by the worker running the Reader. This means that each Reader


sees a


   different state of "latest" (for his partition/s), such that a
failing




   Reader that hasn't read yet might fetch a different "latest" once
it's




   recovered then what it originally fetched. While this may not be as


painful


   for other runners, IMHO it lacks correctness and I'd sug

Re: [DISCUSS] UnboundedSource and the KafkaIO.

2016-10-10 Thread Maximilian Michels
Just to add a comment from the Flink side and its
UnboundedSourceWrapper. We experienced the only way to guarantee
deterministic splitting of the source, was to generate the splits upon
creation of the source and then checkpoint the assignment during
runtime. When restoring from a checkpoint, the same reader
configuration is restored. It's not possible to change the splitting
after the initial splitting has taken place. However, Flink will soon
be able to repartition the operator state upon restart/rescaling of a
job.

Does Spark have a way to pass state of a previous mini batch to the
current mini batch? If so, you could restore the last configuration
and continue reading from the checkpointed offset. You just have to
checkpoint before the mini batch ends.

-Max

On Mon, Oct 10, 2016 at 10:38 AM, Jean-Baptiste Onofré  
wrote:
> Hi Amit,
>
> thanks for the explanation.
>
> For 4, you are right, it's slightly different from DataXchange (related to
> the elements in the PCollection). I think storing the "starting point" for a
> reader makes sense.
>
> Regards
> JB
>
>
> On 10/10/2016 10:33 AM, Amit Sela wrote:
>>
>> Inline, thanks JB!
>>
>> On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofré 
>> wrote:
>>
>>> Hi Amit,
>>>
>>>
>>>
>>> For 1., the runner is responsible of the checkpoint storage (associated
>>>
>>> with the source). It's the way for the runner to retry and know the
>>>
>>> failed bundles.
>>>
>> True, this was a recap/summary of another, not-so-clear, thread.
>>
>>>
>>>
>>>
>>> For 4, are you proposing that KafkaRecord store additional metadata for
>>>
>>> that ? It sounds like what I proposed in the "Technical Vision" appendix
>>>
>>> document: there I proposed to introduce a DataXchange object that store
>>>
>>> some additional metadata (like offset) used by the runner. It would be
>>>
>>> the same with SDF as the tracker state should be persistent as well.
>>>
>> I think I was more focused on persisting the "starting point" for a
>> reader,
>> even if no records were read (yet), so that the next time the reader
>> attempts to read it will pick of there. This has more to do with how the
>> CheckpointMark handles this.
>> I have to say that I'm not familiar with your DataXchange proposal, I will
>> take a look though.
>>
>>>
>>>
>>>
>>> Regards
>>>
>>> JB
>>>
>>>
>>>
>>> On 10/08/2016 01:55 AM, Amit Sela wrote:
>>>
 I started a thread about (suggesting) UnboundedSource splitId's and it
>>>
>>>
 turned into an UnboundedSource/KafkaIO discussion, and I think it's best
>>>
>>> to
>>>
 start over in a clear [DISCUSS] thread.
>>>
>>>

>>>
 When working on UnboundedSource support for the Spark runner, I've
 raised
>>>
>>>
 some questions, some were general-UnboundedSource, and others
>>>
>>>
 Kafka-specific.
>>>
>>>

>>>
 I'd like to recap them here, and maybe have a more productive and
>>>
>>>
 well-documented discussion for everyone.
>>>
>>>

>>>
1. UnboundedSource id's - I assume any runner persists the
>>>
>>>
UnboundedSources's CheckpointMark for fault-tolerance, but I wonder
>>>
>>> how it
>>>
matches the appropriate split (of the UnboundedSource) to it's
>>>
>>> previously
>>>
persisted CheckpointMark in any specific worker ?
>>>
>>>
*Thomas Groh* mentioned that Source splits have to have an
>>>
>>>
 associated identifier,
>>>
>>>
and so the runner gets to tag splits however it pleases, so long as
>>>
>>>
those tags don't allow splits to bleed into each other.
>>>
>>>
2. Consistent splitting - an UnboundedSource splitting seems to
>>>
>>> require
>>>
consistent splitting if it were to "pick-up where it left", correct ?
>>>
>>> this
>>>
is not mentioned as a requirement or a recommendation in
>>>
>>>
UnboundedSource#generateInitialSplits(), so is this a Kafka-only
>>>
>>> issue ?
>>>
*Raghu Angadi* mentioned that Kafka already does so by applying
>>>
>>>
partitions to readers in a round-robin manner.
>>>
>>>
*Thomas Groh* also added that while the UnboundedSource API doesn't
>>>
>>>
require deterministic splitting (although it's recommended), a
>>>
>>>
PipelineRunner
>>>
>>>
should keep track of the initially generated splits.
>>>
>>>
3. Support reading of Kafka partitions that were added to topic/s
>>>
>>> while
>>>
a Pipeline reads from them - BEAM-727
>>>
>>>
 was filed.
>>>
>>>
4. Reading/persisting Kafka start offsets - since Spark works in
>>>
>>>
micro-batches, if "latest" was applied on a fairly sparse topic each
>>>
>>> worker
>>>
would actually begin reading only after it saw a message during the
>>>
>>> time
>>>
window it had to read messages. This is because fetching the offsets
>>>
>>> is
>>>
done by the worker running the Reader. This means that each Reader
>>>
>>> sees a
>>>

Re: [DISCUSS] UnboundedSource and the KafkaIO.

2016-10-10 Thread Jean-Baptiste Onofré

Hi Amit,

thanks for the explanation.

For 4, you are right, it's slightly different from DataXchange (related 
to the elements in the PCollection). I think storing the "starting 
point" for a reader makes sense.


Regards
JB

On 10/10/2016 10:33 AM, Amit Sela wrote:

Inline, thanks JB!

On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofré 
wrote:


Hi Amit,



For 1., the runner is responsible of the checkpoint storage (associated

with the source). It's the way for the runner to retry and know the

failed bundles.


True, this was a recap/summary of another, not-so-clear, thread.





For 4, are you proposing that KafkaRecord store additional metadata for

that ? It sounds like what I proposed in the "Technical Vision" appendix

document: there I proposed to introduce a DataXchange object that store

some additional metadata (like offset) used by the runner. It would be

the same with SDF as the tracker state should be persistent as well.


I think I was more focused on persisting the "starting point" for a reader,
even if no records were read (yet), so that the next time the reader
attempts to read it will pick of there. This has more to do with how the
CheckpointMark handles this.
I have to say that I'm not familiar with your DataXchange proposal, I will
take a look though.





Regards

JB



On 10/08/2016 01:55 AM, Amit Sela wrote:


I started a thread about (suggesting) UnboundedSource splitId's and it



turned into an UnboundedSource/KafkaIO discussion, and I think it's best

to


start over in a clear [DISCUSS] thread.







When working on UnboundedSource support for the Spark runner, I've raised



some questions, some were general-UnboundedSource, and others



Kafka-specific.







I'd like to recap them here, and maybe have a more productive and



well-documented discussion for everyone.







   1. UnboundedSource id's - I assume any runner persists the



   UnboundedSources's CheckpointMark for fault-tolerance, but I wonder

how it


   matches the appropriate split (of the UnboundedSource) to it's

previously


   persisted CheckpointMark in any specific worker ?



   *Thomas Groh* mentioned that Source splits have to have an



associated identifier,



   and so the runner gets to tag splits however it pleases, so long as



   those tags don't allow splits to bleed into each other.



   2. Consistent splitting - an UnboundedSource splitting seems to

require


   consistent splitting if it were to "pick-up where it left", correct ?

this


   is not mentioned as a requirement or a recommendation in



   UnboundedSource#generateInitialSplits(), so is this a Kafka-only

issue ?


   *Raghu Angadi* mentioned that Kafka already does so by applying



   partitions to readers in a round-robin manner.



   *Thomas Groh* also added that while the UnboundedSource API doesn't



   require deterministic splitting (although it's recommended), a



   PipelineRunner



   should keep track of the initially generated splits.



   3. Support reading of Kafka partitions that were added to topic/s

while


   a Pipeline reads from them - BEAM-727



    was filed.



   4. Reading/persisting Kafka start offsets - since Spark works in



   micro-batches, if "latest" was applied on a fairly sparse topic each

worker


   would actually begin reading only after it saw a message during the

time


   window it had to read messages. This is because fetching the offsets

is


   done by the worker running the Reader. This means that each Reader

sees a


   different state of "latest" (for his partition/s), such that a failing



   Reader that hasn't read yet might fetch a different "latest" once it's



   recovered then what it originally fetched. While this may not be as

painful


   for other runners, IMHO it lacks correctness and I'd suggest either

reading


   Kafka metadata of the Kafka cluster once upon initial splitting, or

add


   some of it to the CheckpointMark. Filed BEAM-704



   .







The original thread is called "Should UnboundedSource provide a split



identifier ?".







While the only specific implementation of UnboundedSource discussed here

is


Kafka, it is probably the most popular open-source UnboundedSource.

Having


said that, I wonder where this meets PubSub ? or any other

UnboundedSource


that those questions might affect.







Thanks,



Amit








--

Jean-Baptiste Onofré

jbono...@apache.org

http://blog.nanthrax.net

Talend - http://www.talend.com






--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: [DISCUSS] UnboundedSource and the KafkaIO.

2016-10-10 Thread Amit Sela
Inline, thanks JB!

On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofré 
wrote:

> Hi Amit,
>
>
>
> For 1., the runner is responsible of the checkpoint storage (associated
>
> with the source). It's the way for the runner to retry and know the
>
> failed bundles.
>
True, this was a recap/summary of another, not-so-clear, thread.

>
>
>
> For 4, are you proposing that KafkaRecord store additional metadata for
>
> that ? It sounds like what I proposed in the "Technical Vision" appendix
>
> document: there I proposed to introduce a DataXchange object that store
>
> some additional metadata (like offset) used by the runner. It would be
>
> the same with SDF as the tracker state should be persistent as well.
>
I think I was more focused on persisting the "starting point" for a reader,
even if no records were read (yet), so that the next time the reader
attempts to read it will pick of there. This has more to do with how the
CheckpointMark handles this.
I have to say that I'm not familiar with your DataXchange proposal, I will
take a look though.

>
>
>
> Regards
>
> JB
>
>
>
> On 10/08/2016 01:55 AM, Amit Sela wrote:
>
> > I started a thread about (suggesting) UnboundedSource splitId's and it
>
> > turned into an UnboundedSource/KafkaIO discussion, and I think it's best
> to
>
> > start over in a clear [DISCUSS] thread.
>
> >
>
> > When working on UnboundedSource support for the Spark runner, I've raised
>
> > some questions, some were general-UnboundedSource, and others
>
> > Kafka-specific.
>
> >
>
> > I'd like to recap them here, and maybe have a more productive and
>
> > well-documented discussion for everyone.
>
> >
>
> >1. UnboundedSource id's - I assume any runner persists the
>
> >UnboundedSources's CheckpointMark for fault-tolerance, but I wonder
> how it
>
> >matches the appropriate split (of the UnboundedSource) to it's
> previously
>
> >persisted CheckpointMark in any specific worker ?
>
> >*Thomas Groh* mentioned that Source splits have to have an
>
> > associated identifier,
>
> >and so the runner gets to tag splits however it pleases, so long as
>
> >those tags don't allow splits to bleed into each other.
>
> >2. Consistent splitting - an UnboundedSource splitting seems to
> require
>
> >consistent splitting if it were to "pick-up where it left", correct ?
> this
>
> >is not mentioned as a requirement or a recommendation in
>
> >UnboundedSource#generateInitialSplits(), so is this a Kafka-only
> issue ?
>
> >*Raghu Angadi* mentioned that Kafka already does so by applying
>
> >partitions to readers in a round-robin manner.
>
> >*Thomas Groh* also added that while the UnboundedSource API doesn't
>
> >require deterministic splitting (although it's recommended), a
>
> >PipelineRunner
>
> >should keep track of the initially generated splits.
>
> >3. Support reading of Kafka partitions that were added to topic/s
> while
>
> >a Pipeline reads from them - BEAM-727
>
> > was filed.
>
> >4. Reading/persisting Kafka start offsets - since Spark works in
>
> >micro-batches, if "latest" was applied on a fairly sparse topic each
> worker
>
> >would actually begin reading only after it saw a message during the
> time
>
> >window it had to read messages. This is because fetching the offsets
> is
>
> >done by the worker running the Reader. This means that each Reader
> sees a
>
> >different state of "latest" (for his partition/s), such that a failing
>
> >Reader that hasn't read yet might fetch a different "latest" once it's
>
> >recovered then what it originally fetched. While this may not be as
> painful
>
> >for other runners, IMHO it lacks correctness and I'd suggest either
> reading
>
> >Kafka metadata of the Kafka cluster once upon initial splitting, or
> add
>
> >some of it to the CheckpointMark. Filed BEAM-704
>
> >.
>
> >
>
> > The original thread is called "Should UnboundedSource provide a split
>
> > identifier ?".
>
> >
>
> > While the only specific implementation of UnboundedSource discussed here
> is
>
> > Kafka, it is probably the most popular open-source UnboundedSource.
> Having
>
> > said that, I wonder where this meets PubSub ? or any other
> UnboundedSource
>
> > that those questions might affect.
>
> >
>
> > Thanks,
>
> > Amit
>
> >
>
>
>
> --
>
> Jean-Baptiste Onofré
>
> jbono...@apache.org
>
> http://blog.nanthrax.net
>
> Talend - http://www.talend.com
>
>