Re: makes bundle concept usable?

2017-11-16 Thread Romain Manni-Bucau
Ok, let me try to step back and summarize what we have today and what I miss:

1. we can handle chunking in beam through group in batch (or equivalent) but:
   > it is not built-in into the transforms (IO) and it is controlled
from outside the transforms so no way for a transform to do it
properly without handling itself a composition and links between
multiple dofns to have notifications and potentially react properly or
handle backpressure from its backend
2. there is no restart feature because there is no real state handling
at the moment. this sounds fully delegated to the runner but I was
hoping to have more guarantees from the used API to be able to restart
a pipeline (mainly batch since it can be irrelevant or delegates to
the backend for streams) and handle only not commited records so it
requires some persistence outside the main IO storages to do it
properly
   > note this is somehow similar to the monitoring topic which miss
persistence ATM so it can end up to beam to have a pluggable storage
for a few concerns


Short term I would be happy with 1 solved properly, long term I hope 2
will be tackled without workarounds requiring custom wrapping of IO to
use a custom state persistence.



Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-17 7:44 GMT+01:00 Jean-Baptiste Onofré :
> Thanks for the explanation. Agree, we might talk about different things
> using the same wording.
>
> I'm also struggling to understand the use case (for a generic DoFn).
>
> Regards
> JB
>
>
> On 11/17/2017 07:40 AM, Eugene Kirpichov wrote:
>>
>> To avoid spending a lot of time pursuing a false path, I'd like to say
>> straight up that SDF is definitely not going to help here, despite the
>> fact
>> that its API includes the term "checkpoint". In SDF, the "checkpoint"
>> captures the state of processing within a single element. If you're
>> applying an SDF to 1000 elements, it will, like any other DoFn, be applied
>> to each of them independently and in parallel, and you'll have 1000
>> checkpoints capturing the state of processing each of these elements,
>> which
>> is probably not what you want.
>>
>> I'm afraid I still don't understand what kind of checkpoint you need, if
>> it
>> is not just deterministic grouping into batches. "Checkpoint" is a very
>> broad term and it's very possible that everybody in this thread is talking
>> about different things when saying it. So it would help if you could give
>> a
>> more concrete example: for example, take some IO that you think could be
>> easier to write with your proposed API, give the contents of a
>> hypothetical
>> PCollection being written to this IO, give the code of a hypothetical DoFn
>> implementing the write using your API, and explain what you'd expect to
>> happen at runtime.
>>
>> On Thu, Nov 16, 2017 at 10:33 PM Romain Manni-Bucau
>> 
>> wrote:
>>
>>> @Eugene: yes and the other alternative of Reuven too but it is still
>>> 1. relying on timers, 2. not really checkpointed
>>>
>>> In other words it seems all solutions are to create a chunk of size 1
>>> and replayable to fake the lack of chunking in the framework. This
>>> always implies a chunk handling outside the component (typically
>>> before for an output). My point is I think IO need it in their own
>>> "internal" or at least control it themselves since the chunk size is
>>> part of the IO handling most of the time.
>>>
>>> I think JB spoke of the same "group before" trick using restrictions
>>> which can work I have to admit if SDF are implemented by runners. Is
>>> there a roadmap/status on that? Last time I checked SDF was a great
>>> API without support :(.
>>>
>>>
>>>
>>> Romain Manni-Bucau
>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>>
>>>
>>> 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov
>>> :

 JB, not sure what you mean? SDFs and triggers are unrelated, and the
 post
 doesn't mention the word. Did you mean something else, e.g. restriction
 perhaps? Either way I don't think SDFs are the solution here; SDFs have
>>>
>>> to

 do with the ability to split the processing of *a single element* over
 multiple calls, whereas Romain I think is asking for repeatable grouping
>>>
>>> of

 *multiple* elements.

 Romain - does

>>>
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java

 do what
 you want?

 On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré 
 wrote:

> It sounds like the "Trigger" in the Splittable DoFn, no ?
>
> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
>
> Regards
> JB
>
>
> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote:
>>
>> it gives the fn/transform the ability to save a state - it can get
>> back on "restart" / whatever unit we 

Re: makes bundle concept usable?

2017-11-16 Thread Jean-Baptiste Onofré
Thanks for the explanation. Agree, we might talk about different things using 
the same wording.


I'm also struggling to understand the use case (for a generic DoFn).

Regards
JB

On 11/17/2017 07:40 AM, Eugene Kirpichov wrote:

To avoid spending a lot of time pursuing a false path, I'd like to say
straight up that SDF is definitely not going to help here, despite the fact
that its API includes the term "checkpoint". In SDF, the "checkpoint"
captures the state of processing within a single element. If you're
applying an SDF to 1000 elements, it will, like any other DoFn, be applied
to each of them independently and in parallel, and you'll have 1000
checkpoints capturing the state of processing each of these elements, which
is probably not what you want.

I'm afraid I still don't understand what kind of checkpoint you need, if it
is not just deterministic grouping into batches. "Checkpoint" is a very
broad term and it's very possible that everybody in this thread is talking
about different things when saying it. So it would help if you could give a
more concrete example: for example, take some IO that you think could be
easier to write with your proposed API, give the contents of a hypothetical
PCollection being written to this IO, give the code of a hypothetical DoFn
implementing the write using your API, and explain what you'd expect to
happen at runtime.

On Thu, Nov 16, 2017 at 10:33 PM Romain Manni-Bucau 
wrote:


@Eugene: yes and the other alternative of Reuven too but it is still
1. relying on timers, 2. not really checkpointed

In other words it seems all solutions are to create a chunk of size 1
and replayable to fake the lack of chunking in the framework. This
always implies a chunk handling outside the component (typically
before for an output). My point is I think IO need it in their own
"internal" or at least control it themselves since the chunk size is
part of the IO handling most of the time.

I think JB spoke of the same "group before" trick using restrictions
which can work I have to admit if SDF are implemented by runners. Is
there a roadmap/status on that? Last time I checked SDF was a great
API without support :(.



Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-17 7:25 GMT+01:00 Eugene Kirpichov :

JB, not sure what you mean? SDFs and triggers are unrelated, and the post
doesn't mention the word. Did you mean something else, e.g. restriction
perhaps? Either way I don't think SDFs are the solution here; SDFs have

to

do with the ability to split the processing of *a single element* over
multiple calls, whereas Romain I think is asking for repeatable grouping

of

*multiple* elements.

Romain - does


https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java

do what
you want?

On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré 
wrote:


It sounds like the "Trigger" in the Splittable DoFn, no ?

https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html

Regards
JB


On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote:

it gives the fn/transform the ability to save a state - it can get
back on "restart" / whatever unit we can use, probably runner
dependent? Without that you need to rewrite all IO usage with
something like the previous pattern which makes the IO not self
sufficient and kind of makes the entry cost and usage of beam way
further.

In my mind it is exactly what jbatch/spring-batch uses but adapted to
beam (stream in particular) case.

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-17 6:49 GMT+01:00 Reuven Lax :

Romain,

Can you define what you mean by checkpoint? What are the semantics,

what

does it accomplish?

Reuven

On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau <

rmannibu...@gmail.com>

wrote:


Yes, what I propose earlier was:

I. checkpoint marker:

@AnyBeamAnnotation
@CheckpointAfter
public void someHook(SomeContext ctx);


II. pipeline.apply(ParDo.of(new MyFn()).withCheckpointAlgorithm(new
CountingAlgo()))

III. (I like this one less)

// in the dofn
@CheckpointTester
public boolean shouldCheckpoint();

IV. @Checkpointer Serializable getCheckpoint(); in the dofn per

element





Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-17 6:06 GMT+01:00 Raghu Angadi 

wrote:


This is a fair summary of the current state but also where beam

can

have a

very strong added value and make big data great and smooth.

Instead of this replay feature isnt checkpointing willable? In

particular

with SDF no?


Le 16 nov. 2017 19:50, "Raghu Angadi" 

Re: makes bundle concept usable?

2017-11-16 Thread Eugene Kirpichov
To avoid spending a lot of time pursuing a false path, I'd like to say
straight up that SDF is definitely not going to help here, despite the fact
that its API includes the term "checkpoint". In SDF, the "checkpoint"
captures the state of processing within a single element. If you're
applying an SDF to 1000 elements, it will, like any other DoFn, be applied
to each of them independently and in parallel, and you'll have 1000
checkpoints capturing the state of processing each of these elements, which
is probably not what you want.

I'm afraid I still don't understand what kind of checkpoint you need, if it
is not just deterministic grouping into batches. "Checkpoint" is a very
broad term and it's very possible that everybody in this thread is talking
about different things when saying it. So it would help if you could give a
more concrete example: for example, take some IO that you think could be
easier to write with your proposed API, give the contents of a hypothetical
PCollection being written to this IO, give the code of a hypothetical DoFn
implementing the write using your API, and explain what you'd expect to
happen at runtime.

On Thu, Nov 16, 2017 at 10:33 PM Romain Manni-Bucau 
wrote:

> @Eugene: yes and the other alternative of Reuven too but it is still
> 1. relying on timers, 2. not really checkpointed
>
> In other words it seems all solutions are to create a chunk of size 1
> and replayable to fake the lack of chunking in the framework. This
> always implies a chunk handling outside the component (typically
> before for an output). My point is I think IO need it in their own
> "internal" or at least control it themselves since the chunk size is
> part of the IO handling most of the time.
>
> I think JB spoke of the same "group before" trick using restrictions
> which can work I have to admit if SDF are implemented by runners. Is
> there a roadmap/status on that? Last time I checked SDF was a great
> API without support :(.
>
>
>
> Romain Manni-Bucau
> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>
>
> 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov :
> > JB, not sure what you mean? SDFs and triggers are unrelated, and the post
> > doesn't mention the word. Did you mean something else, e.g. restriction
> > perhaps? Either way I don't think SDFs are the solution here; SDFs have
> to
> > do with the ability to split the processing of *a single element* over
> > multiple calls, whereas Romain I think is asking for repeatable grouping
> of
> > *multiple* elements.
> >
> > Romain - does
> >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
> > do what
> > you want?
> >
> > On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré 
> > wrote:
> >
> >> It sounds like the "Trigger" in the Splittable DoFn, no ?
> >>
> >> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
> >>
> >> Regards
> >> JB
> >>
> >>
> >> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote:
> >> > it gives the fn/transform the ability to save a state - it can get
> >> > back on "restart" / whatever unit we can use, probably runner
> >> > dependent? Without that you need to rewrite all IO usage with
> >> > something like the previous pattern which makes the IO not self
> >> > sufficient and kind of makes the entry cost and usage of beam way
> >> > further.
> >> >
> >> > In my mind it is exactly what jbatch/spring-batch uses but adapted to
> >> > beam (stream in particular) case.
> >> >
> >> > Romain Manni-Bucau
> >> > @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >
> >> >
> >> > 2017-11-17 6:49 GMT+01:00 Reuven Lax :
> >> >> Romain,
> >> >>
> >> >> Can you define what you mean by checkpoint? What are the semantics,
> what
> >> >> does it accomplish?
> >> >>
> >> >> Reuven
> >> >>
> >> >> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau <
> >> rmannibu...@gmail.com>
> >> >> wrote:
> >> >>
> >> >>> Yes, what I propose earlier was:
> >> >>>
> >> >>> I. checkpoint marker:
> >> >>>
> >> >>> @AnyBeamAnnotation
> >> >>> @CheckpointAfter
> >> >>> public void someHook(SomeContext ctx);
> >> >>>
> >> >>>
> >> >>> II. pipeline.apply(ParDo.of(new MyFn()).withCheckpointAlgorithm(new
> >> >>> CountingAlgo()))
> >> >>>
> >> >>> III. (I like this one less)
> >> >>>
> >> >>> // in the dofn
> >> >>> @CheckpointTester
> >> >>> public boolean shouldCheckpoint();
> >> >>>
> >> >>> IV. @Checkpointer Serializable getCheckpoint(); in the dofn per
> element
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> Romain Manni-Bucau
> >> >>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >>>
> >> >>>
> >> >>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi  >:
> >>  How would you define it (rough API is fine)?. Without more details,
> >> it is
> >>  not easy to see wider applicability and feasibility in runners.
> >> 
> >>  On Thu, 

Re: Sink API question

2017-11-16 Thread Eugene Kirpichov
Hi Chet,
It sounds like you want the following pattern:
- Write data in parallel
- Once all parallel writes have completed, gather their results and issue a
commit

The sink API once used to do something like that, but it turned out that
the only thing that mapped well onto that API was files; and other things
were either much more complex than the API could express
(BigQueryIO.write()) or much simpler (regular DoFn's). So we removed the
API, because it was useful only for one thing (files) and because it kept
repeatedly confusing people into thinking they need to use it ("I'm writing
to a storage system, I probably should use the Sink API"), whereas in
nearly 100% of the causes they didn't.

However, your use case maps well onto the original goal of Sink. I'd
recommend either looking at how WriteFiles and BigQueryIO.write() work (be
warned: they are very complex) or looking at how the original Sink API
worked. There was nothing special about it, it was just a composite
transform that anyone can write in their pipeline without adding anything
to Beam itself. See original code at 1.8.0:
https://github.com/apache/beam/blob/v1.8.0/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Write.java#L290
-
draw inspiration from it and implement your transform in a similar way :)
(though you'll be able to do it much simpler, because you're implementing
an individual case rather than a general-purpose API)

On Thu, Nov 16, 2017 at 5:28 PM Chet Aldrich 
wrote:

> Hello all,
>
> I’m in the process of implementing a way to write data using a PTransform
> to Algolia (https://www.algolia.com/ ).
> However, in the process of doing so I’ve run into a bit of a snag, and was
> curious if someone here would be able to help me figure this out.
>
> Building a DoFn that can accomplish this is straightforward, since it
> essentially just involves creating a bundle of values and then flushing the
> batch out to Algolia using their API client as needed.
>
> However, I’d like to perform the changes to the index atomically, that is,
> to either write all of the data or none of the data in the event of a
> pipeline failure. This can be done in Algolia by moving a temporary index
> on top of an existing one, like they do here:
> https://www.algolia.com/doc/tutorials/indexing/synchronization/atomic-reindexing/
> <
> https://www.algolia.com/doc/tutorials/indexing/synchronization/atomic-reindexing/
> >
>
> This is where it gets a bit more tricky. I noted that there exists a
> @Teardown annotation that allows one to do something like close the client
> when the DoFn is complete on a given machine, but it doesn’t quite do what
> I want.
>
> In theory, I’d like to write to a temporary index, and then when the
> transform has been performed on all elements, I then move the index over,
> completing the operation.
>
> I previous implemented this functionality using the Beam Python SDK using
> the Sink class described here:
> https://beam.apache.org/documentation/sdks/python-custom-io/ <
> https://beam.apache.org/documentation/sdks/python-custom-io/>
>
> I’m making the transition to the Java SDK because of the built in JDBC I/O
> transform. However, I’m finding that this Sink API for java is proving
> elusive, and digging around hasn’t proved fruitful. Specifically, I was
> looking at this page and it seems like it was directing me to ask here if
> I’m not sure whether the functionality I desire can be implemented with a
> DoFn:
> https://beam.apache.org/documentation/io/authoring-overview/#when-to-implement-using-the-sink-api
> <
> https://beam.apache.org/documentation/io/authoring-overview/#when-to-implement-using-the-sink-api
> >
>
> Is there something that can do something similar to what I just described?
> If there’s something I just missed while digging through the DoFn
> documentation that’d be great, but I didn’t see anything.
>
> Best,
>
> Chet
>
>
>
>


Re: makes bundle concept usable?

2017-11-16 Thread Romain Manni-Bucau
@Eugene: yes and the other alternative of Reuven too but it is still
1. relying on timers, 2. not really checkpointed

In other words it seems all solutions are to create a chunk of size 1
and replayable to fake the lack of chunking in the framework. This
always implies a chunk handling outside the component (typically
before for an output). My point is I think IO need it in their own
"internal" or at least control it themselves since the chunk size is
part of the IO handling most of the time.

I think JB spoke of the same "group before" trick using restrictions
which can work I have to admit if SDF are implemented by runners. Is
there a roadmap/status on that? Last time I checked SDF was a great
API without support :(.



Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-17 7:25 GMT+01:00 Eugene Kirpichov :
> JB, not sure what you mean? SDFs and triggers are unrelated, and the post
> doesn't mention the word. Did you mean something else, e.g. restriction
> perhaps? Either way I don't think SDFs are the solution here; SDFs have to
> do with the ability to split the processing of *a single element* over
> multiple calls, whereas Romain I think is asking for repeatable grouping of
> *multiple* elements.
>
> Romain - does
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
> do what
> you want?
>
> On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré 
> wrote:
>
>> It sounds like the "Trigger" in the Splittable DoFn, no ?
>>
>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
>>
>> Regards
>> JB
>>
>>
>> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote:
>> > it gives the fn/transform the ability to save a state - it can get
>> > back on "restart" / whatever unit we can use, probably runner
>> > dependent? Without that you need to rewrite all IO usage with
>> > something like the previous pattern which makes the IO not self
>> > sufficient and kind of makes the entry cost and usage of beam way
>> > further.
>> >
>> > In my mind it is exactly what jbatch/spring-batch uses but adapted to
>> > beam (stream in particular) case.
>> >
>> > Romain Manni-Bucau
>> > @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >
>> >
>> > 2017-11-17 6:49 GMT+01:00 Reuven Lax :
>> >> Romain,
>> >>
>> >> Can you define what you mean by checkpoint? What are the semantics, what
>> >> does it accomplish?
>> >>
>> >> Reuven
>> >>
>> >> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau <
>> rmannibu...@gmail.com>
>> >> wrote:
>> >>
>> >>> Yes, what I propose earlier was:
>> >>>
>> >>> I. checkpoint marker:
>> >>>
>> >>> @AnyBeamAnnotation
>> >>> @CheckpointAfter
>> >>> public void someHook(SomeContext ctx);
>> >>>
>> >>>
>> >>> II. pipeline.apply(ParDo.of(new MyFn()).withCheckpointAlgorithm(new
>> >>> CountingAlgo()))
>> >>>
>> >>> III. (I like this one less)
>> >>>
>> >>> // in the dofn
>> >>> @CheckpointTester
>> >>> public boolean shouldCheckpoint();
>> >>>
>> >>> IV. @Checkpointer Serializable getCheckpoint(); in the dofn per element
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> Romain Manni-Bucau
>> >>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >>>
>> >>>
>> >>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi :
>>  How would you define it (rough API is fine)?. Without more details,
>> it is
>>  not easy to see wider applicability and feasibility in runners.
>> 
>>  On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau <
>> >>> rmannibu...@gmail.com>
>>  wrote:
>> 
>> > This is a fair summary of the current state but also where beam can
>> >>> have a
>> > very strong added value and make big data great and smooth.
>> >
>> > Instead of this replay feature isnt checkpointing willable? In
>> >>> particular
>> > with SDF no?
>> >
>> >
>> > Le 16 nov. 2017 19:50, "Raghu Angadi"  a
>> > écrit :
>> >
>> >> Core issue here is that there is no explicit concept of 'checkpoint'
>> >>> in
>> >> Beam (UnboundedSource has a method 'getCheckpointMark' but that
>> >>> refers to
>> >> the checkoint on external source). Runners do checkpoint internally
>> as
>> >> implementation detail. Flink's checkpoint model is entirely
>> different
>> > from
>> >> Dataflow's and Spark's.
>> >>
>> >> @StableReplay helps, but it does not explicitly talk about a
>> >>> checkpoint
>> > by
>> >> design.
>> >>
>> >> If you are looking to achieve some guarantees with a sink/DoFn, I
>> >>> think
>> > it
>> >> is better to start with the requirements. I worked on exactly-once
>> >>> sink
>> > for
>> >> Kafka (see KafkaIO.write().withEOS()), where we essentially reshard
>> >>> the
>> >> elements and assign sequence numbers to elements with in each shard.
>> >> Duplicates in replays are avoided 

Re: makes bundle concept usable?

2017-11-16 Thread Jean-Baptiste Onofré

Sorry, not trigger, I meant tracker (a bit early in the morning for me) ;)

The tracker in the SDF controls the restriction/offset/etc. So I think it could 
be used to group elements no ?


Regards
JB

On 11/17/2017 07:25 AM, Eugene Kirpichov wrote:

JB, not sure what you mean? SDFs and triggers are unrelated, and the post
doesn't mention the word. Did you mean something else, e.g. restriction
perhaps? Either way I don't think SDFs are the solution here; SDFs have to
do with the ability to split the processing of *a single element* over
multiple calls, whereas Romain I think is asking for repeatable grouping of
*multiple* elements.

Romain - does
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
do what
you want?

On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré 
wrote:


It sounds like the "Trigger" in the Splittable DoFn, no ?

https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html

Regards
JB


On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote:

it gives the fn/transform the ability to save a state - it can get
back on "restart" / whatever unit we can use, probably runner
dependent? Without that you need to rewrite all IO usage with
something like the previous pattern which makes the IO not self
sufficient and kind of makes the entry cost and usage of beam way
further.

In my mind it is exactly what jbatch/spring-batch uses but adapted to
beam (stream in particular) case.

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-17 6:49 GMT+01:00 Reuven Lax :

Romain,

Can you define what you mean by checkpoint? What are the semantics, what
does it accomplish?

Reuven

On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau <

rmannibu...@gmail.com>

wrote:


Yes, what I propose earlier was:

I. checkpoint marker:

@AnyBeamAnnotation
@CheckpointAfter
public void someHook(SomeContext ctx);


II. pipeline.apply(ParDo.of(new MyFn()).withCheckpointAlgorithm(new
CountingAlgo()))

III. (I like this one less)

// in the dofn
@CheckpointTester
public boolean shouldCheckpoint();

IV. @Checkpointer Serializable getCheckpoint(); in the dofn per element




Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-17 6:06 GMT+01:00 Raghu Angadi :

How would you define it (rough API is fine)?. Without more details,

it is

not easy to see wider applicability and feasibility in runners.

On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau <

rmannibu...@gmail.com>

wrote:


This is a fair summary of the current state but also where beam can

have a

very strong added value and make big data great and smooth.

Instead of this replay feature isnt checkpointing willable? In

particular

with SDF no?


Le 16 nov. 2017 19:50, "Raghu Angadi"  a
écrit :


Core issue here is that there is no explicit concept of 'checkpoint'

in

Beam (UnboundedSource has a method 'getCheckpointMark' but that

refers to

the checkoint on external source). Runners do checkpoint internally

as

implementation detail. Flink's checkpoint model is entirely

different

from

Dataflow's and Spark's.

@StableReplay helps, but it does not explicitly talk about a

checkpoint

by

design.

If you are looking to achieve some guarantees with a sink/DoFn, I

think

it

is better to start with the requirements. I worked on exactly-once

sink

for

Kafka (see KafkaIO.write().withEOS()), where we essentially reshard

the

elements and assign sequence numbers to elements with in each shard.
Duplicates in replays are avoided based on these sequence numbers.

DoFn

state API is used to buffer out-of order replays. The implementation
strategy works in Dataflow but not in Flink which has a horizontal
checkpoint. KafkaIO checks for compatibility.

On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau <
rmannibu...@gmail.com>
wrote:


Hi guys,

The subject is a bit provocative but the topic is real and coming
again and again with the beam usage: how a dofn can handle some
"chunking".

The need is to be able to commit each N records but with N not too

big.


The natural API for that in beam is the bundle one but bundles are

not

reliable since they can be very small (flink) - we can say it is

"ok"

even if it has some perf impacts - or too big (spark does full size

/

#workers).

The workaround is what we see in the ES I/O: a maxSize which does

an

eager flush. The issue is that then the checkpoint is not respected
and you can process multiple times the same records.

Any plan to make this API reliable and controllable from a beam

point

of view (at least in a max manner)?

Thanks,
Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn









--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com





--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net

Re: makes bundle concept usable?

2017-11-16 Thread Eugene Kirpichov
JB, not sure what you mean? SDFs and triggers are unrelated, and the post
doesn't mention the word. Did you mean something else, e.g. restriction
perhaps? Either way I don't think SDFs are the solution here; SDFs have to
do with the ability to split the processing of *a single element* over
multiple calls, whereas Romain I think is asking for repeatable grouping of
*multiple* elements.

Romain - does
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
do what
you want?

On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré 
wrote:

> It sounds like the "Trigger" in the Splittable DoFn, no ?
>
> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
>
> Regards
> JB
>
>
> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote:
> > it gives the fn/transform the ability to save a state - it can get
> > back on "restart" / whatever unit we can use, probably runner
> > dependent? Without that you need to rewrite all IO usage with
> > something like the previous pattern which makes the IO not self
> > sufficient and kind of makes the entry cost and usage of beam way
> > further.
> >
> > In my mind it is exactly what jbatch/spring-batch uses but adapted to
> > beam (stream in particular) case.
> >
> > Romain Manni-Bucau
> > @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >
> >
> > 2017-11-17 6:49 GMT+01:00 Reuven Lax :
> >> Romain,
> >>
> >> Can you define what you mean by checkpoint? What are the semantics, what
> >> does it accomplish?
> >>
> >> Reuven
> >>
> >> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau <
> rmannibu...@gmail.com>
> >> wrote:
> >>
> >>> Yes, what I propose earlier was:
> >>>
> >>> I. checkpoint marker:
> >>>
> >>> @AnyBeamAnnotation
> >>> @CheckpointAfter
> >>> public void someHook(SomeContext ctx);
> >>>
> >>>
> >>> II. pipeline.apply(ParDo.of(new MyFn()).withCheckpointAlgorithm(new
> >>> CountingAlgo()))
> >>>
> >>> III. (I like this one less)
> >>>
> >>> // in the dofn
> >>> @CheckpointTester
> >>> public boolean shouldCheckpoint();
> >>>
> >>> IV. @Checkpointer Serializable getCheckpoint(); in the dofn per element
> >>>
> >>>
> >>>
> >>>
> >>> Romain Manni-Bucau
> >>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >>>
> >>>
> >>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi :
>  How would you define it (rough API is fine)?. Without more details,
> it is
>  not easy to see wider applicability and feasibility in runners.
> 
>  On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau <
> >>> rmannibu...@gmail.com>
>  wrote:
> 
> > This is a fair summary of the current state but also where beam can
> >>> have a
> > very strong added value and make big data great and smooth.
> >
> > Instead of this replay feature isnt checkpointing willable? In
> >>> particular
> > with SDF no?
> >
> >
> > Le 16 nov. 2017 19:50, "Raghu Angadi"  a
> > écrit :
> >
> >> Core issue here is that there is no explicit concept of 'checkpoint'
> >>> in
> >> Beam (UnboundedSource has a method 'getCheckpointMark' but that
> >>> refers to
> >> the checkoint on external source). Runners do checkpoint internally
> as
> >> implementation detail. Flink's checkpoint model is entirely
> different
> > from
> >> Dataflow's and Spark's.
> >>
> >> @StableReplay helps, but it does not explicitly talk about a
> >>> checkpoint
> > by
> >> design.
> >>
> >> If you are looking to achieve some guarantees with a sink/DoFn, I
> >>> think
> > it
> >> is better to start with the requirements. I worked on exactly-once
> >>> sink
> > for
> >> Kafka (see KafkaIO.write().withEOS()), where we essentially reshard
> >>> the
> >> elements and assign sequence numbers to elements with in each shard.
> >> Duplicates in replays are avoided based on these sequence numbers.
> >>> DoFn
> >> state API is used to buffer out-of order replays. The implementation
> >> strategy works in Dataflow but not in Flink which has a horizontal
> >> checkpoint. KafkaIO checks for compatibility.
> >>
> >> On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau <
> >> rmannibu...@gmail.com>
> >> wrote:
> >>
> >>> Hi guys,
> >>>
> >>> The subject is a bit provocative but the topic is real and coming
> >>> again and again with the beam usage: how a dofn can handle some
> >>> "chunking".
> >>>
> >>> The need is to be able to commit each N records but with N not too
> >>> big.
> >>>
> >>> The natural API for that in beam is the bundle one but bundles are
> >>> not
> >>> reliable since they can be very small (flink) - we can say it is
> >>> "ok"
> >>> even if it has some perf impacts - or too big (spark does full size
> >>> /
> >>> #workers).
> >>>
> >>> The workaround is what we 

Re: makes bundle concept usable?

2017-11-16 Thread Jean-Baptiste Onofré

It sounds like the "Trigger" in the Splittable DoFn, no ?

https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html

Regards
JB


On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote:

it gives the fn/transform the ability to save a state - it can get
back on "restart" / whatever unit we can use, probably runner
dependent? Without that you need to rewrite all IO usage with
something like the previous pattern which makes the IO not self
sufficient and kind of makes the entry cost and usage of beam way
further.

In my mind it is exactly what jbatch/spring-batch uses but adapted to
beam (stream in particular) case.

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-17 6:49 GMT+01:00 Reuven Lax :

Romain,

Can you define what you mean by checkpoint? What are the semantics, what
does it accomplish?

Reuven

On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau 
wrote:


Yes, what I propose earlier was:

I. checkpoint marker:

@AnyBeamAnnotation
@CheckpointAfter
public void someHook(SomeContext ctx);


II. pipeline.apply(ParDo.of(new MyFn()).withCheckpointAlgorithm(new
CountingAlgo()))

III. (I like this one less)

// in the dofn
@CheckpointTester
public boolean shouldCheckpoint();

IV. @Checkpointer Serializable getCheckpoint(); in the dofn per element




Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-17 6:06 GMT+01:00 Raghu Angadi :

How would you define it (rough API is fine)?. Without more details, it is
not easy to see wider applicability and feasibility in runners.

On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau <

rmannibu...@gmail.com>

wrote:


This is a fair summary of the current state but also where beam can

have a

very strong added value and make big data great and smooth.

Instead of this replay feature isnt checkpointing willable? In

particular

with SDF no?


Le 16 nov. 2017 19:50, "Raghu Angadi"  a
écrit :


Core issue here is that there is no explicit concept of 'checkpoint'

in

Beam (UnboundedSource has a method 'getCheckpointMark' but that

refers to

the checkoint on external source). Runners do checkpoint internally as
implementation detail. Flink's checkpoint model is entirely different

from

Dataflow's and Spark's.

@StableReplay helps, but it does not explicitly talk about a

checkpoint

by

design.

If you are looking to achieve some guarantees with a sink/DoFn, I

think

it

is better to start with the requirements. I worked on exactly-once

sink

for

Kafka (see KafkaIO.write().withEOS()), where we essentially reshard

the

elements and assign sequence numbers to elements with in each shard.
Duplicates in replays are avoided based on these sequence numbers.

DoFn

state API is used to buffer out-of order replays. The implementation
strategy works in Dataflow but not in Flink which has a horizontal
checkpoint. KafkaIO checks for compatibility.

On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau <
rmannibu...@gmail.com>
wrote:


Hi guys,

The subject is a bit provocative but the topic is real and coming
again and again with the beam usage: how a dofn can handle some
"chunking".

The need is to be able to commit each N records but with N not too

big.


The natural API for that in beam is the bundle one but bundles are

not

reliable since they can be very small (flink) - we can say it is

"ok"

even if it has some perf impacts - or too big (spark does full size

/

#workers).

The workaround is what we see in the ES I/O: a maxSize which does an
eager flush. The issue is that then the checkpoint is not respected
and you can process multiple times the same records.

Any plan to make this API reliable and controllable from a beam

point

of view (at least in a max manner)?

Thanks,
Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn









--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


[VOTE] Release 2.2.0, release candidate #4

2017-11-16 Thread Reuven Lax
Hi everyone,

Please review and vote on the release candidate #4 for the version 2.2.0,
as follows:
  [ ] +1, Approve the release
  [ ] -1, Do not approve the release (please provide specific comments)


The complete staging area is available for your review, which includes:
  * JIRA release notes [1],
  * the official Apache source release to be deployed to dist.apache.org [2],
which is signed with the key with fingerprint B98B7708 [3],
  * all artifacts to be deployed to the Maven Central Repository [4],
  * source code tag "v2.2.0-RC4" [5],
  * website pull request listing the release and publishing the API
reference manual [6].
  * Java artifacts were built with Maven 3.5.0 and OpenJDK/Oracle JDK
1.8.0_144.
  * Python artifacts are deployed along with the source release to the
dist.apache.org [2].

The vote will be open for at least 72 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.

Thanks,
Reuven

[1] https://issues.apache.org/jira/secure/ReleaseNote.jspa?p
rojectId=12319527=12341044
[2] https://dist.apache.org/repos/dist/dev/beam/2.2.0/
[3] https://dist.apache.org/repos/dist/release/beam/KEYS
[4] https://repository.apache.org/content/repositories/orgapachebeam-1025/
[5] https://github.com/apache/beam/tree/v2.2.0-RC4

[6] https://github.com/apache/beam-site/pull/337


Re: makes bundle concept usable?

2017-11-16 Thread Romain Manni-Bucau
it gives the fn/transform the ability to save a state - it can get
back on "restart" / whatever unit we can use, probably runner
dependent? Without that you need to rewrite all IO usage with
something like the previous pattern which makes the IO not self
sufficient and kind of makes the entry cost and usage of beam way
further.

In my mind it is exactly what jbatch/spring-batch uses but adapted to
beam (stream in particular) case.

Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-17 6:49 GMT+01:00 Reuven Lax :
> Romain,
>
> Can you define what you mean by checkpoint? What are the semantics, what
> does it accomplish?
>
> Reuven
>
> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau 
> wrote:
>
>> Yes, what I propose earlier was:
>>
>> I. checkpoint marker:
>>
>> @AnyBeamAnnotation
>> @CheckpointAfter
>> public void someHook(SomeContext ctx);
>>
>>
>> II. pipeline.apply(ParDo.of(new MyFn()).withCheckpointAlgorithm(new
>> CountingAlgo()))
>>
>> III. (I like this one less)
>>
>> // in the dofn
>> @CheckpointTester
>> public boolean shouldCheckpoint();
>>
>> IV. @Checkpointer Serializable getCheckpoint(); in the dofn per element
>>
>>
>>
>>
>> Romain Manni-Bucau
>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>
>>
>> 2017-11-17 6:06 GMT+01:00 Raghu Angadi :
>> > How would you define it (rough API is fine)?. Without more details, it is
>> > not easy to see wider applicability and feasibility in runners.
>> >
>> > On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau <
>> rmannibu...@gmail.com>
>> > wrote:
>> >
>> >> This is a fair summary of the current state but also where beam can
>> have a
>> >> very strong added value and make big data great and smooth.
>> >>
>> >> Instead of this replay feature isnt checkpointing willable? In
>> particular
>> >> with SDF no?
>> >>
>> >>
>> >> Le 16 nov. 2017 19:50, "Raghu Angadi"  a
>> >> écrit :
>> >>
>> >> > Core issue here is that there is no explicit concept of 'checkpoint'
>> in
>> >> > Beam (UnboundedSource has a method 'getCheckpointMark' but that
>> refers to
>> >> > the checkoint on external source). Runners do checkpoint internally as
>> >> > implementation detail. Flink's checkpoint model is entirely different
>> >> from
>> >> > Dataflow's and Spark's.
>> >> >
>> >> > @StableReplay helps, but it does not explicitly talk about a
>> checkpoint
>> >> by
>> >> > design.
>> >> >
>> >> > If you are looking to achieve some guarantees with a sink/DoFn, I
>> think
>> >> it
>> >> > is better to start with the requirements. I worked on exactly-once
>> sink
>> >> for
>> >> > Kafka (see KafkaIO.write().withEOS()), where we essentially reshard
>> the
>> >> > elements and assign sequence numbers to elements with in each shard.
>> >> > Duplicates in replays are avoided based on these sequence numbers.
>> DoFn
>> >> > state API is used to buffer out-of order replays. The implementation
>> >> > strategy works in Dataflow but not in Flink which has a horizontal
>> >> > checkpoint. KafkaIO checks for compatibility.
>> >> >
>> >> > On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau <
>> >> > rmannibu...@gmail.com>
>> >> > wrote:
>> >> >
>> >> > > Hi guys,
>> >> > >
>> >> > > The subject is a bit provocative but the topic is real and coming
>> >> > > again and again with the beam usage: how a dofn can handle some
>> >> > > "chunking".
>> >> > >
>> >> > > The need is to be able to commit each N records but with N not too
>> big.
>> >> > >
>> >> > > The natural API for that in beam is the bundle one but bundles are
>> not
>> >> > > reliable since they can be very small (flink) - we can say it is
>> "ok"
>> >> > > even if it has some perf impacts - or too big (spark does full size
>> /
>> >> > > #workers).
>> >> > >
>> >> > > The workaround is what we see in the ES I/O: a maxSize which does an
>> >> > > eager flush. The issue is that then the checkpoint is not respected
>> >> > > and you can process multiple times the same records.
>> >> > >
>> >> > > Any plan to make this API reliable and controllable from a beam
>> point
>> >> > > of view (at least in a max manner)?
>> >> > >
>> >> > > Thanks,
>> >> > > Romain Manni-Bucau
>> >> > > @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> >> > >
>> >> >
>> >>
>>


Re: makes bundle concept usable?

2017-11-16 Thread Romain Manni-Bucau
Yes, what I propose earlier was:

I. checkpoint marker:

@AnyBeamAnnotation
@CheckpointAfter
public void someHook(SomeContext ctx);


II. pipeline.apply(ParDo.of(new MyFn()).withCheckpointAlgorithm(new
CountingAlgo()))

III. (I like this one less)

// in the dofn
@CheckpointTester
public boolean shouldCheckpoint();

IV. @Checkpointer Serializable getCheckpoint(); in the dofn per element




Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-17 6:06 GMT+01:00 Raghu Angadi :
> How would you define it (rough API is fine)?. Without more details, it is
> not easy to see wider applicability and feasibility in runners.
>
> On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau 
> wrote:
>
>> This is a fair summary of the current state but also where beam can have a
>> very strong added value and make big data great and smooth.
>>
>> Instead of this replay feature isnt checkpointing willable? In particular
>> with SDF no?
>>
>>
>> Le 16 nov. 2017 19:50, "Raghu Angadi"  a
>> écrit :
>>
>> > Core issue here is that there is no explicit concept of 'checkpoint' in
>> > Beam (UnboundedSource has a method 'getCheckpointMark' but that refers to
>> > the checkoint on external source). Runners do checkpoint internally as
>> > implementation detail. Flink's checkpoint model is entirely different
>> from
>> > Dataflow's and Spark's.
>> >
>> > @StableReplay helps, but it does not explicitly talk about a checkpoint
>> by
>> > design.
>> >
>> > If you are looking to achieve some guarantees with a sink/DoFn, I think
>> it
>> > is better to start with the requirements. I worked on exactly-once sink
>> for
>> > Kafka (see KafkaIO.write().withEOS()), where we essentially reshard the
>> > elements and assign sequence numbers to elements with in each shard.
>> > Duplicates in replays are avoided based on these sequence numbers. DoFn
>> > state API is used to buffer out-of order replays. The implementation
>> > strategy works in Dataflow but not in Flink which has a horizontal
>> > checkpoint. KafkaIO checks for compatibility.
>> >
>> > On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau <
>> > rmannibu...@gmail.com>
>> > wrote:
>> >
>> > > Hi guys,
>> > >
>> > > The subject is a bit provocative but the topic is real and coming
>> > > again and again with the beam usage: how a dofn can handle some
>> > > "chunking".
>> > >
>> > > The need is to be able to commit each N records but with N not too big.
>> > >
>> > > The natural API for that in beam is the bundle one but bundles are not
>> > > reliable since they can be very small (flink) - we can say it is "ok"
>> > > even if it has some perf impacts - or too big (spark does full size /
>> > > #workers).
>> > >
>> > > The workaround is what we see in the ES I/O: a maxSize which does an
>> > > eager flush. The issue is that then the checkpoint is not respected
>> > > and you can process multiple times the same records.
>> > >
>> > > Any plan to make this API reliable and controllable from a beam point
>> > > of view (at least in a max manner)?
>> > >
>> > > Thanks,
>> > > Romain Manni-Bucau
>> > > @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> > >
>> >
>>


Re: Sink API question

2017-11-16 Thread Jean-Baptiste Onofré

Hi,

if you take a look on existing IO, most of them doesn't use the Sink API: they 
implement a Sink using a DoFn.


I think Algolia would be the same for the Write. What do you think about 
updating the index when we finalize a bundle ?


NB: what's the Algolia "client/API" license ? Just to double check that it can 
be part of Beam.


Regards
JB

On 11/17/2017 02:28 AM, Chet Aldrich wrote:

Hello all,

I’m in the process of implementing a way to write data using a PTransform to Algolia 
(https://www.algolia.com/ ). However, in the process 
of doing so I’ve run into a bit of a snag, and was curious if someone here would be 
able to help me figure this out.

Building a DoFn that can accomplish this is straightforward, since it 
essentially just involves creating a bundle of values and then flushing the 
batch out to Algolia using their API client as needed.

However, I’d like to perform the changes to the index atomically, that is, to either 
write all of the data or none of the data in the event of a pipeline failure. This 
can be done in Algolia by moving a temporary index on top of an existing one, like 
they do here: 
https://www.algolia.com/doc/tutorials/indexing/synchronization/atomic-reindexing/ 


This is where it gets a bit more tricky. I noted that there exists a @Teardown 
annotation that allows one to do something like close the client when the DoFn 
is complete on a given machine, but it doesn’t quite do what I want.

In theory, I’d like to write to a temporary index, and then when the transform 
has been performed on all elements, I then move the index over, completing the 
operation.

I previous implemented this functionality using the Beam Python SDK using the Sink 
class described here: https://beam.apache.org/documentation/sdks/python-custom-io/ 


I’m making the transition to the Java SDK because of the built in JDBC I/O transform. 
However, I’m finding that this Sink API for java is proving elusive, and digging 
around hasn’t proved fruitful. Specifically, I was looking at this page and it seems 
like it was directing me to ask here if I’m not sure whether the functionality I 
desire can be implemented with a DoFn: 
https://beam.apache.org/documentation/io/authoring-overview/#when-to-implement-using-the-sink-api
 

 

Is there something that can do something similar to what I just described? If 
there’s something I just missed while digging through the DoFn documentation 
that’d be great, but I didn’t see anything.

Best,

Chet






--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Sink API question

2017-11-16 Thread Chet Aldrich
Hello all, 

I’m in the process of implementing a way to write data using a PTransform to 
Algolia (https://www.algolia.com/ ). However, in the 
process of doing so I’ve run into a bit of a snag, and was curious if someone 
here would be able to help me figure this out. 

Building a DoFn that can accomplish this is straightforward, since it 
essentially just involves creating a bundle of values and then flushing the 
batch out to Algolia using their API client as needed. 

However, I’d like to perform the changes to the index atomically, that is, to 
either write all of the data or none of the data in the event of a pipeline 
failure. This can be done in Algolia by moving a temporary index on top of an 
existing one, like they do here: 
https://www.algolia.com/doc/tutorials/indexing/synchronization/atomic-reindexing/
 


This is where it gets a bit more tricky. I noted that there exists a @Teardown 
annotation that allows one to do something like close the client when the DoFn 
is complete on a given machine, but it doesn’t quite do what I want. 

In theory, I’d like to write to a temporary index, and then when the transform 
has been performed on all elements, I then move the index over, completing the 
operation.

I previous implemented this functionality using the Beam Python SDK using the 
Sink class described here: 
https://beam.apache.org/documentation/sdks/python-custom-io/ 


I’m making the transition to the Java SDK because of the built in JDBC I/O 
transform. However, I’m finding that this Sink API for java is proving elusive, 
and digging around hasn’t proved fruitful. Specifically, I was looking at this 
page and it seems like it was directing me to ask here if I’m not sure whether 
the functionality I desire can be implemented with a DoFn: 
https://beam.apache.org/documentation/io/authoring-overview/#when-to-implement-using-the-sink-api
 

   

Is there something that can do something similar to what I just described? If 
there’s something I just missed while digging through the DoFn documentation 
that’d be great, but I didn’t see anything. 

Best, 

Chet 





Re: makes bundle concept usable?

2017-11-16 Thread Raghu Angadi
Core issue here is that there is no explicit concept of 'checkpoint' in
Beam (UnboundedSource has a method 'getCheckpointMark' but that refers to
the checkoint on external source). Runners do checkpoint internally as
implementation detail. Flink's checkpoint model is entirely different from
Dataflow's and Spark's.

@StableReplay helps, but it does not explicitly talk about a checkpoint by
design.

If you are looking to achieve some guarantees with a sink/DoFn, I think it
is better to start with the requirements. I worked on exactly-once sink for
Kafka (see KafkaIO.write().withEOS()), where we essentially reshard the
elements and assign sequence numbers to elements with in each shard.
Duplicates in replays are avoided based on these sequence numbers. DoFn
state API is used to buffer out-of order replays. The implementation
strategy works in Dataflow but not in Flink which has a horizontal
checkpoint. KafkaIO checks for compatibility.

On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau 
wrote:

> Hi guys,
>
> The subject is a bit provocative but the topic is real and coming
> again and again with the beam usage: how a dofn can handle some
> "chunking".
>
> The need is to be able to commit each N records but with N not too big.
>
> The natural API for that in beam is the bundle one but bundles are not
> reliable since they can be very small (flink) - we can say it is "ok"
> even if it has some perf impacts - or too big (spark does full size /
> #workers).
>
> The workaround is what we see in the ES I/O: a maxSize which does an
> eager flush. The issue is that then the checkpoint is not respected
> and you can process multiple times the same records.
>
> Any plan to make this API reliable and controllable from a beam point
> of view (at least in a max manner)?
>
> Thanks,
> Romain Manni-Bucau
> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>


[Proposal] IOIT test parameters validation

2017-11-16 Thread Łukasz Gajowy
Hi all!

We are currently working on the IO IT "test harness" that will allow to run
the IOITs on various runners, filesystems and with changing amount of data.
It is described in a doc some of you have probably seen and put comments in
the doc [1] (in the context of BEAM-3060 [2] task).

Part of the job that in our opinion could be done is to somehow validate
the input parameters that are passed to the test. By saying "test input
parameters" I specifically mean:
 - parameters passed as mvn's system properties, such as:
"-Dfilesystem=gcs" or "-DintegrationTestRunner=dataflow"
 - parameters that are to be passed as PipelineOptions, such as
"numberOfRecords" or "filenamePrefix" in TextIO

We imagine a situation when test parameters are passed in an incompatible
way, eg. someone can be willing to use the "dataflow" runner on some
filesystem that is unsupported there (say, s3). Running an IOIT with such
setup will most certainly fail. The crux of the idea is to inform the
developer early enough so that no such errors are made and test execution
time is saved. It eases debugging and avoids potential configuration errors.

The doc [1] fully specifies what has to be validated. We'd like to validate:
1. runner + filesystem combination  - both passed as system properties
2. filesystem + pipeline options - as some additional options may be
required by specific FS.
3. runner + pipeline options  - as some additional options may be required
by specific runner
4. IO test class instance (eg. TextIO) + options dedicated to it (eg.
numberOfRecords)

We have an idea to write a small maven plugin for the validation. An
initial PoC to just view the concept is on GitHub [3]. The plugin would
essentialy be a small module in the ".test-infra" folder - the same as
other test-related stuff such as kubernetes scripts or jenkins files. We
should hide only the validation logic there - what gets validated should be
declaratively provided in pom.xml of each IOIT. All necessary validation
configuration could be passed in plugin's  section [4], [5].

Pros:
 + input parameters validation is run very early - before compilation. We
don't waste time and wait for the tests to fail
 + validation can be run depending on profile - we can append the plugin to
the profile's build section (eg. to an io-it profiles build section).
 + validation is run once - after the first failure there is an exception
thrown which breaks the build (in case of running more than one test)
 + (AFAIK) we could support multiple sdks in the future with this plugin -
the only condition is being able to start the mvn plugin in some initial
build phase
 + (AFAIK) we can run it on gradle or adapt it to be a gradle task too
 + every IO can have a set of validation rules defined in it's pom.xml
which is readable and clearly shows what combination can be done or not)

Cons:
- everytime a new filesystem/runner etc is supported we have to update the
rules
- the plugin needs to be a separate module

Concerns:
 - is validating all the four points there a good idea? Perhaps the scope
of the things to validate in the plugin is too wide? As far as I know,
validation for runner specific PipelineOptions is already available.

What do you think?

Thanks,
Łukasz

[1]
https://docs.google.com/document/d/1dA-5s6OHiP_cz-NRAbwapoKF5MEC1wKps4A5tFbIPKE/edit#heading=h.84fcpdbcdqu

[2] https://issues.apache.org/jira/browse/BEAM-3060
[3]
https://github.com/lgajowy/beam/blob/validator-poc/.test-infra/testprops/src/main/java/org/apache/beam/testprops/TestPropsPlugin.java
[4]
https://github.com/lgajowy/beam/blob/validator-poc/sdks/java/io/jdbc/pom.xml#L46
[5]
https://github.com/lgajowy/beam/blob/validator-poc/sdks/java/io/file-based-io-tests/pom.xml#L47


Re: Memory consumption & ValueProvider.RuntimeValueProvider#optionsMap

2017-11-16 Thread Lukasz Cwik
I filed https://issues.apache.org/jira/browse/BEAM-3202

On Thu, Nov 16, 2017 at 9:19 AM, Lukasz Cwik  wrote:

> That seems like a bug since its expected that the options id is always set
> to something when the PipelineOptions object is created so when
> serialized/deserialized the same options id is always returned.
>
> Seems like a trivial fix in PipelineOptionsFactory to always just call
> getOptionsId at least once to ensure it gets populated with a value by the
> default value factory.
>
> On Thu, Nov 16, 2017 at 3:54 AM, Stas Levin  wrote:
>
>> Hi all,
>>
>> I'm investigating a memory consumption issue (leak, so it seems) and was
>> wondering if it could be related to the way runtime options are handled.
>> In particular, upon deserializing a PipelineOptions object,
>> ProxyInvocationHandler.Deserializer
>> calls ValueProvider.RuntimeValueProvider.setRuntimeOptions(options) which
>> stores the (newly) deserialized PipelineOptions instance in a static map
>> inside the RuntimeValueProvider class, where the key is an id obtained by
>> calling deserializedOptions.getOptionsId().
>>
>> The thing is, performing a serialize-deserialize cycle on a given
>> PipelineOptions instance and invoking getOptionsId() yields different
>> optionsIds. Therefore, multiple deserializations of the same
>> PipelineOptions instance result in new keys being added to the static
>> "optionsMap" map inside the ValueProvider.RuntimeValueProvider class.
>>
>> I wasn't able to identify any removals from this static, long-lived map
>> (ValueProvider.RuntimeValueProvider#optionsMap), any chance it is
>> ever-growing?
>> Am I missing something about the way things interact when a given
>> PipelineOptions instance gets deserialized numerous times?
>>
>> Regards,
>> Stas
>>
>
>


Re: Memory consumption & ValueProvider.RuntimeValueProvider#optionsMap

2017-11-16 Thread Lukasz Cwik
That seems like a bug since its expected that the options id is always set
to something when the PipelineOptions object is created so when
serialized/deserialized the same options id is always returned.

Seems like a trivial fix in PipelineOptionsFactory to always just call
getOptionsId at least once to ensure it gets populated with a value by the
default value factory.

On Thu, Nov 16, 2017 at 3:54 AM, Stas Levin  wrote:

> Hi all,
>
> I'm investigating a memory consumption issue (leak, so it seems) and was
> wondering if it could be related to the way runtime options are handled.
> In particular, upon deserializing a PipelineOptions object,
> ProxyInvocationHandler.Deserializer
> calls ValueProvider.RuntimeValueProvider.setRuntimeOptions(options) which
> stores the (newly) deserialized PipelineOptions instance in a static map
> inside the RuntimeValueProvider class, where the key is an id obtained by
> calling deserializedOptions.getOptionsId().
>
> The thing is, performing a serialize-deserialize cycle on a given
> PipelineOptions instance and invoking getOptionsId() yields different
> optionsIds. Therefore, multiple deserializations of the same
> PipelineOptions instance result in new keys being added to the static
> "optionsMap" map inside the ValueProvider.RuntimeValueProvider class.
>
> I wasn't able to identify any removals from this static, long-lived map
> (ValueProvider.RuntimeValueProvider#optionsMap), any chance it is
> ever-growing?
> Am I missing something about the way things interact when a given
> PipelineOptions instance gets deserialized numerous times?
>
> Regards,
> Stas
>


Re: makes bundle concept usable?

2017-11-16 Thread Romain Manni-Bucau
2017-11-16 12:18 GMT+01:00 Reuven Lax :
> On Wed, Nov 15, 2017 at 9:16 PM, Romain Manni-Bucau 
> wrote:
>
>> @Reuven: it looks like a good workaround
>> @Ken: thks a lot for the link!
>>
>> @all:
>>
>> 1. do you think it is doable without windowing usage (to have
>> something more reliable in term of runner since it will depend on less
>> primitives?
>>
>
> This depends on trigggering, not on windowing. Triggering is a pretty core
> component of the model - no unbounded inputs can be processed at all
> without trigggering. "Checkpointing" is a harder thing to pin down, as it
> means different things to different runners (e.g. "checkpointing" in Flink
> means something very different than in Datafalow and different than in
> Spark).

My bad, sorry about that semantic abuse. The point is however still
here. Several IO (all) are trigger independent so I wonder if we want
to couple them for a "core feature" (chunking is needed in almost all
IO).

>
>
>
>> 2. what about allowing the user to define when to checkpoint?
>>
>
> As I mentioned, "checkpoint" is sometimes an ill-defined operation,
> especially across different runners . Instead I think it's better to have
> an annotation that defines the semantics you want (e.g. stable replay), and
> let the runner decide how to implement it (possibly by checkpointing).

Issue with that solution is it doesn't scale and requires an
implementation per semantic instead of relying on a single primitive
for all cases. Because of the runner design and portability goal of
beam we should probably avoid that, no?

>
> 3. can we get this kind of "composite" pattern in the beam core?
>>
>
> I don't see why not. Though we first need to get @StableReplay implemented.
>
>
>>
>>
>>
>> Romain Manni-Bucau
>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>
>>
>> 2017-11-15 14:12 GMT+01:00 Kenneth Knowles :
>> > In case the connection is not clear to folks on this thread, I pinged the
>> > thread on @StableReplay / @RequiresStableInput / etc and opened a draft
>> PR
>> > at https://github.com/apache/beam/pull/4135.
>> >
>> > On Wed, Nov 15, 2017 at 3:24 AM, Reuven Lax 
>> > wrote:
>> >
>> >> so I think the following will do exactly that and can be easily factored
>> >> into a reusable transform (modulo Java type boilerplate):
>> >>
>> >> pCollection.apply(WithKeys.of((Element e) ->
>> >> ThreadLocalRandom.current().nextInt(N))
>> >>   .apply(Window.into(new GlobalWindows())
>> >>
>> >> .triggering(AfterWatermark.pastEndOfWindow().
>> withEarlyFirings(AfterPane.
>> >> elementCountAtLeast(100
>> >>   .apply(GroupByKey.create())
>> >>   .apply(ParDo.of(new DoFn<>() {
>> >>   @ProcessElement
>> >>   @StableReplay
>> >>public void processElement(ProcessContext c) {
>> >>  // Insert c.element().getValue() into backend.
>> >>}
>> >>});
>> >>
>> >> On Wed, Nov 15, 2017 at 7:00 PM, Romain Manni-Bucau <
>> rmannibu...@gmail.com
>> >> >
>> >> wrote:
>> >>
>> >> > 2017-11-15 11:42 GMT+01:00 Reuven Lax :
>> >> > > Can we describe this at a higher level?
>> >> > >
>> >> > > I think what you want is the following. Please correct if I'm
>> >> > > misunderstanding.
>> >> > >
>> >> > > Batches of 100 elements (is this a hard requirement, or do they
>> have to
>> >> > be
>> >> > > "approximately" 100 element?)
>> >> >
>> >> > Approximately is fine while documented (what is not is 100 instead
>> >> > of 10 for instance)
>> >> >
>> >> > >
>> >> > > Once you see a batch, you're guaranteed to see the same batch on
>> >> retries.
>> >> >
>> >> > +1
>> >> >
>> >> > >
>> >> > > You want to then idempotently insert this batch into some backend.
>> >> Things
>> >> > > may fail, workers may crash, but in that case you want to get the
>> exact
>> >> > > same batch back so you can insert it again.
>> >> >
>> >> > +1
>> >> >
>> >> > >
>> >> > > Do you care about ordering? On failure do you have to see the same
>> >> > batches
>> >> > > in the same order as before, or is it sufficient to see the same
>> >> batches?
>> >> >
>> >> > Beam doesnt everywhere so I guess it is not important - at least for
>> >> > my cases this statement is true.
>> >> >
>> >> > >
>> >> > > Reuven
>> >> > >
>> >> > > On Wed, Nov 15, 2017 at 5:58 PM, Romain Manni-Bucau <
>> >> > rmannibu...@gmail.com>
>> >> > > wrote:
>> >> > >
>> >> > >> Overall goal is to ensure each 100 elements max, a "backend" (as
>> >> > >> datastore) flush/commit/push is done and is aligned with beam
>> >> > >> checkpoints. You can see it as bringing the "general"
>> commit-interval
>> >> > >> notion to beam and kind of get rid of the bundle notion which is
>> >> > >> almost impossible to use today.
>> >> > >>
>> >> > >> Romain Manni-Bucau
>> >> > >> 

[VOTE] Choose the "new" Spark runner

2017-11-16 Thread Jean-Baptiste Onofré

Hi guys,

To illustrate the current discussion about Spark versions support, you can take 
a look on:


--
Spark 1 & Spark 2 Support Branch

https://github.com/jbonofre/beam/tree/BEAM-1920-SPARK2-MODULES

This branch contains a Spark runner common module compatible with both Spark 1.x 
and 2.x. For convenience, we introduced spark1 & spark2 modules/artifacts 
containing just a pom.xml to define the dependencies set.


--
Spark 2 Only Branch

https://github.com/jbonofre/beam/tree/BEAM-1920-SPARK2-ONLY

This branch is an upgrade to Spark 2.x and "drop" support of Spark 1.x.

As I'm ready to merge one of the other in the PR, I would like to complete the 
vote/discussion pretty soon.


Correct me if I'm wrong, but it seems that the preference is to drop Spark 1.x 
to focus only on Spark 2.x (for the Spark 2 Only Branch).


I would like to call a final vote to act the merge I will do:

[ ] Use Spark 1 & Spark 2 Support Branch
[ ] Use Spark 2 Only Branch

This informal vote is open for 48 hours.

Please, let me know what your preference is.

Thanks !
Regards
JB

On 11/13/2017 09:32 AM, Jean-Baptiste Onofré wrote:

Hi Beamers,

I'm forwarding this discussion & vote from the dev mailing list to the user 
mailing list.

The goal is to have your feedback as user.

Basically, we have two options:
1. Right now, in the PR, we support both Spark 1.x and 2.x using three artifacts 
(common, spark1, spark2). You, as users, pick up spark1 or spark2 in your 
dependencies set depending the Spark target version you want.
2. The other option is to upgrade and focus on Spark 2.x in Beam 2.3.0. If you 
still want to use Spark 1.x, then, you will be stuck up to Beam 2.2.0.


Thoughts ?

Thanks !
Regards
JB


 Forwarded Message 
Subject: [VOTE] Drop Spark 1.x support to focus on Spark 2.x
Date: Wed, 8 Nov 2017 08:27:58 +0100
From: Jean-Baptiste Onofré 
Reply-To: dev@beam.apache.org
To: dev@beam.apache.org

Hi all,

as you might know, we are working on Spark 2.x support in the Spark runner.

I'm working on a PR about that:

https://github.com/apache/beam/pull/3808

Today, we have something working with both Spark 1.x and 2.x from a code 
standpoint, but I have to deal with dependencies. It's the first step of the 
update as I'm still using RDD, the second step would be to support dataframe 
(but for that, I would need PCollection elements with schemas, that's another 
topic on which Eugene, Reuven and I are discussing).


However, as all major distributions now ship Spark 2.x, I don't think it's 
required anymore to support Spark 1.x.


If we agree, I will update and cleanup the PR to only support and focus on Spark 
2.x.


So, that's why I'm calling for a vote:

   [ ] +1 to drop Spark 1.x support and upgrade to Spark 2.x only
   [ ] 0 (I don't care ;))
   [ ] -1, I would like to still support Spark 1.x, and so having support of 
both Spark 1.x and 2.x (please provide specific comment)


This vote is open for 48 hours (I have the commits ready, just waiting the end 
of the vote to push on the PR).


Thanks !
Regards
JB


--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Memory consumption & ValueProvider.RuntimeValueProvider#optionsMap

2017-11-16 Thread Stas Levin
Hi all,

I'm investigating a memory consumption issue (leak, so it seems) and was
wondering if it could be related to the way runtime options are handled.
In particular, upon deserializing a PipelineOptions object,
ProxyInvocationHandler.Deserializer
calls ValueProvider.RuntimeValueProvider.setRuntimeOptions(options) which
stores the (newly) deserialized PipelineOptions instance in a static map
inside the RuntimeValueProvider class, where the key is an id obtained by
calling deserializedOptions.getOptionsId().

The thing is, performing a serialize-deserialize cycle on a given
PipelineOptions instance and invoking getOptionsId() yields different
optionsIds. Therefore, multiple deserializations of the same
PipelineOptions instance result in new keys being added to the static
"optionsMap" map inside the ValueProvider.RuntimeValueProvider class.

I wasn't able to identify any removals from this static, long-lived map
(ValueProvider.RuntimeValueProvider#optionsMap), any chance it is
ever-growing?
Am I missing something about the way things interact when a given
PipelineOptions instance gets deserialized numerous times?

Regards,
Stas


Re: makes bundle concept usable?

2017-11-16 Thread Reuven Lax
On Wed, Nov 15, 2017 at 9:16 PM, Romain Manni-Bucau 
wrote:

> @Reuven: it looks like a good workaround
> @Ken: thks a lot for the link!
>
> @all:
>
> 1. do you think it is doable without windowing usage (to have
> something more reliable in term of runner since it will depend on less
> primitives?
>

This depends on trigggering, not on windowing. Triggering is a pretty core
component of the model - no unbounded inputs can be processed at all
without trigggering. "Checkpointing" is a harder thing to pin down, as it
means different things to different runners (e.g. "checkpointing" in Flink
means something very different than in Datafalow and different than in
Spark).



> 2. what about allowing the user to define when to checkpoint?
>

As I mentioned, "checkpoint" is sometimes an ill-defined operation,
especially across different runners . Instead I think it's better to have
an annotation that defines the semantics you want (e.g. stable replay), and
let the runner decide how to implement it (possibly by checkpointing).

3. can we get this kind of "composite" pattern in the beam core?
>

I don't see why not. Though we first need to get @StableReplay implemented.


>
>
>
> Romain Manni-Bucau
> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>
>
> 2017-11-15 14:12 GMT+01:00 Kenneth Knowles :
> > In case the connection is not clear to folks on this thread, I pinged the
> > thread on @StableReplay / @RequiresStableInput / etc and opened a draft
> PR
> > at https://github.com/apache/beam/pull/4135.
> >
> > On Wed, Nov 15, 2017 at 3:24 AM, Reuven Lax 
> > wrote:
> >
> >> so I think the following will do exactly that and can be easily factored
> >> into a reusable transform (modulo Java type boilerplate):
> >>
> >> pCollection.apply(WithKeys.of((Element e) ->
> >> ThreadLocalRandom.current().nextInt(N))
> >>   .apply(Window.into(new GlobalWindows())
> >>
> >> .triggering(AfterWatermark.pastEndOfWindow().
> withEarlyFirings(AfterPane.
> >> elementCountAtLeast(100
> >>   .apply(GroupByKey.create())
> >>   .apply(ParDo.of(new DoFn<>() {
> >>   @ProcessElement
> >>   @StableReplay
> >>public void processElement(ProcessContext c) {
> >>  // Insert c.element().getValue() into backend.
> >>}
> >>});
> >>
> >> On Wed, Nov 15, 2017 at 7:00 PM, Romain Manni-Bucau <
> rmannibu...@gmail.com
> >> >
> >> wrote:
> >>
> >> > 2017-11-15 11:42 GMT+01:00 Reuven Lax :
> >> > > Can we describe this at a higher level?
> >> > >
> >> > > I think what you want is the following. Please correct if I'm
> >> > > misunderstanding.
> >> > >
> >> > > Batches of 100 elements (is this a hard requirement, or do they
> have to
> >> > be
> >> > > "approximately" 100 element?)
> >> >
> >> > Approximately is fine while documented (what is not is 100 instead
> >> > of 10 for instance)
> >> >
> >> > >
> >> > > Once you see a batch, you're guaranteed to see the same batch on
> >> retries.
> >> >
> >> > +1
> >> >
> >> > >
> >> > > You want to then idempotently insert this batch into some backend.
> >> Things
> >> > > may fail, workers may crash, but in that case you want to get the
> exact
> >> > > same batch back so you can insert it again.
> >> >
> >> > +1
> >> >
> >> > >
> >> > > Do you care about ordering? On failure do you have to see the same
> >> > batches
> >> > > in the same order as before, or is it sufficient to see the same
> >> batches?
> >> >
> >> > Beam doesnt everywhere so I guess it is not important - at least for
> >> > my cases this statement is true.
> >> >
> >> > >
> >> > > Reuven
> >> > >
> >> > > On Wed, Nov 15, 2017 at 5:58 PM, Romain Manni-Bucau <
> >> > rmannibu...@gmail.com>
> >> > > wrote:
> >> > >
> >> > >> Overall goal is to ensure each 100 elements max, a "backend" (as
> >> > >> datastore) flush/commit/push is done and is aligned with beam
> >> > >> checkpoints. You can see it as bringing the "general"
> commit-interval
> >> > >> notion to beam and kind of get rid of the bundle notion which is
> >> > >> almost impossible to use today.
> >> > >>
> >> > >> Romain Manni-Bucau
> >> > >> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> > >>
> >> > >>
> >> > >> 2017-11-15 10:27 GMT+01:00 Reuven Lax :
> >> > >> > It's in the dev list archives, not sure if there's a doc yet.
> >> > >> >
> >> > >> > I'm not quite sure I understand what you mean by a "flush" Can
> you
> >> > >> describe
> >> > >> > the problem you're trying to solve?
> >> > >> >
> >> > >> > Reuven
> >> > >> >
> >> > >> > On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau <
> >> > >> rmannibu...@gmail.com>
> >> > >> > wrote:
> >> > >> >
> >> > >> >> Hmm, I didn't find the doc - if you have the link not far it
> would
> >> be
> >> > >> >> appreciated 

Re: [VOTE] Release 2.2.0, release candidate #3

2017-11-16 Thread Reuven Lax
Retrying the whole step succeeded, so somehow this was an ephemeral error.

On Thu, Nov 16, 2017 at 6:28 PM, Reuven Lax  wrote:

> I've fixed the Python issue - turns out my local path got messed up.
>
> However, mvn release:prepare is now failing with the following. I haven't
> seen this failure before - does anyone know what might be causing it?
>
> [*ERROR*] Failed to execute goal org.apache.maven.plugins:
> maven-release-plugin:2.5.3:prepare *(default-cli)* on project beam-parent:
> *An error is occurred in the checkin process: Exception while executing
> SCM command.*: Detecting the current branch failed: fatal: ref HEAD is
> not a symbolic ref -> *[Help 1]*
>
>
>
>
> On Thu, Nov 16, 2017 at 10:11 AM, Reuven Lax  wrote:
>
>> This is with the CP of Eugene's PR. However Eugene's PR does not touch
>> anything Python.
>>
>> On Thu, Nov 16, 2017 at 10:10 AM, Reuven Lax  wrote:
>>
>>>
>>> mvn -Prelease clean install
>>>
>>>
>>> [*INFO*] *--- *exec-maven-plugin:1.5.0:exec *(setuptools-clean)* @
>>> beam-sdks-python* ---*
>>>
>>> Could not find platform independent libraries 
>>>
>>> Could not find platform dependent libraries 
>>>
>>> Consider setting $PYTHONHOME to [:]
>>>
>>> ImportError: No module named site
>>>
>>> [*ERROR*] Command execution failed.
>>>
>>> org.apache.commons.exec.ExecuteException: Process exited with an error:
>>> 1 (Exit value: 1)
>>>
>>> at org.apache.commons.exec.DefaultExecutor.executeInternal(Defa
>>> ultExecutor.java:404)
>>>
>>> at org.apache.commons.exec.DefaultExecutor.execute(DefaultExecu
>>> tor.java:166)
>>>
>>> at org.codehaus.mojo.exec.ExecMojo.executeCommandLine(ExecMojo.java:764)
>>>
>>> at org.codehaus.mojo.exec.ExecMojo.executeCommandLine(ExecMojo.java:711)
>>>
>>> at org.codehaus.mojo.exec.ExecMojo.execute(ExecMojo.java:289)
>>>
>>> at org.apache.maven.plugin.DefaultBuildPluginManager.executeMoj
>>> o(DefaultBuildPluginManager.java:134)
>>>
>>> at org.apache.maven.lifecycle.internal.MojoExecutor.execute(Moj
>>> oExecutor.java:208)
>>>
>>> at org.apache.maven.lifecycle.internal.MojoExecutor.execute(Moj
>>> oExecutor.java:154)
>>>
>>> at org.apache.maven.lifecycle.internal.MojoExecutor.execute(Moj
>>> oExecutor.java:146)
>>>
>>> at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.b
>>> uildProject(LifecycleModuleBuilder.java:117)
>>>
>>> at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.b
>>> uildProject(LifecycleModuleBuilder.java:81)
>>>
>>> at org.apache.maven.lifecycle.internal.builder.singlethreaded.S
>>> ingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
>>>
>>> at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
>>> (LifecycleStarter.java:128)
>>>
>>> at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:309)
>>>
>>> at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:194)
>>>
>>> at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:107)
>>>
>>> at org.apache.maven.cli.MavenCli.execute(MavenCli.java:993)
>>>
>>> at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:345)
>>>
>>> at org.apache.maven.cli.MavenCli.main(MavenCli.java:191)
>>>
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>> ssorImpl.java:62)
>>>
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>> thodAccessorImpl.java:43)
>>>
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>
>>> at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnha
>>> nced(Launcher.java:289)
>>>
>>> at org.codehaus.plexus.classworlds.launcher.Launcher.launch(Lau
>>> ncher.java:229)
>>>
>>> at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithEx
>>> itCode(Launcher.java:415)
>>>
>>> at org.codehaus.plexus.classworlds.launcher.Launcher.main(Launc
>>> her.java:356)
>>>
>>> On Thu, Nov 16, 2017 at 5:02 AM, Charles Chen 
>>> wrote:
>>>
 Could you send the command you used that produced this error?  I can't
 reproduce it at the tip of the release-2.2.0 branch.

 On Wed, Nov 15, 2017 at 5:34 AM Reuven Lax 
 wrote:

 > I'm trying to do the last CP and cut RC4, but I'm getting a
 compilation
 > failure in Python - "ImportError: No module named site"
 >
 > Did we possibly break the release branch on one of the Python CPs?
 >
 > Reuven
 >
 > On Sun, Nov 12, 2017 at 5:12 PM, Jean-Baptiste Onofré <
 j...@nanthrax.net>
 > wrote:
 >
 > > Hi Reuven,
 > >
 > > +1 for RC4, and don't worry: it's part of the process. I prefer to
 have a
 > > long release process than a crappy a release ;) That's exactly the
 > purpose
 > > of review & vote.
 > >
 > > I definitely think that having releases more often will reduce such
 kind
 > > of issue.
 > >
 > > Regards
 > > JB
 > >
 > >
 > > On 11/12/2017 09:04 AM, Reuven 

Re: [VOTE] Release 2.2.0, release candidate #3

2017-11-16 Thread Reuven Lax
I've fixed the Python issue - turns out my local path got messed up.

However, mvn release:prepare is now failing with the following. I haven't
seen this failure before - does anyone know what might be causing it?

[*ERROR*] Failed to execute goal
org.apache.maven.plugins:maven-release-plugin:2.5.3:prepare *(default-cli)*
on project beam-parent: *An error is occurred in the checkin process:
Exception while executing SCM command.*: Detecting the current branch
failed: fatal: ref HEAD is not a symbolic ref -> *[Help 1]*




On Thu, Nov 16, 2017 at 10:11 AM, Reuven Lax  wrote:

> This is with the CP of Eugene's PR. However Eugene's PR does not touch
> anything Python.
>
> On Thu, Nov 16, 2017 at 10:10 AM, Reuven Lax  wrote:
>
>>
>> mvn -Prelease clean install
>>
>>
>> [*INFO*] *--- *exec-maven-plugin:1.5.0:exec *(setuptools-clean)* @
>> beam-sdks-python* ---*
>>
>> Could not find platform independent libraries 
>>
>> Could not find platform dependent libraries 
>>
>> Consider setting $PYTHONHOME to [:]
>>
>> ImportError: No module named site
>>
>> [*ERROR*] Command execution failed.
>>
>> org.apache.commons.exec.ExecuteException: Process exited with an error:
>> 1 (Exit value: 1)
>>
>> at org.apache.commons.exec.DefaultExecutor.executeInternal(Defa
>> ultExecutor.java:404)
>>
>> at org.apache.commons.exec.DefaultExecutor.execute(DefaultExecu
>> tor.java:166)
>>
>> at org.codehaus.mojo.exec.ExecMojo.executeCommandLine(ExecMojo.java:764)
>>
>> at org.codehaus.mojo.exec.ExecMojo.executeCommandLine(ExecMojo.java:711)
>>
>> at org.codehaus.mojo.exec.ExecMojo.execute(ExecMojo.java:289)
>>
>> at org.apache.maven.plugin.DefaultBuildPluginManager.executeMoj
>> o(DefaultBuildPluginManager.java:134)
>>
>> at org.apache.maven.lifecycle.internal.MojoExecutor.execute(Moj
>> oExecutor.java:208)
>>
>> at org.apache.maven.lifecycle.internal.MojoExecutor.execute(Moj
>> oExecutor.java:154)
>>
>> at org.apache.maven.lifecycle.internal.MojoExecutor.execute(Moj
>> oExecutor.java:146)
>>
>> at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.b
>> uildProject(LifecycleModuleBuilder.java:117)
>>
>> at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.b
>> uildProject(LifecycleModuleBuilder.java:81)
>>
>> at org.apache.maven.lifecycle.internal.builder.singlethreaded.S
>> ingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
>>
>> at org.apache.maven.lifecycle.internal.LifecycleStarter.execute
>> (LifecycleStarter.java:128)
>>
>> at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:309)
>>
>> at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:194)
>>
>> at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:107)
>>
>> at org.apache.maven.cli.MavenCli.execute(MavenCli.java:993)
>>
>> at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:345)
>>
>> at org.apache.maven.cli.MavenCli.main(MavenCli.java:191)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>>
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:498)
>>
>> at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnha
>> nced(Launcher.java:289)
>>
>> at org.codehaus.plexus.classworlds.launcher.Launcher.launch(
>> Launcher.java:229)
>>
>> at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithEx
>> itCode(Launcher.java:415)
>>
>> at org.codehaus.plexus.classworlds.launcher.Launcher.main(
>> Launcher.java:356)
>>
>> On Thu, Nov 16, 2017 at 5:02 AM, Charles Chen 
>> wrote:
>>
>>> Could you send the command you used that produced this error?  I can't
>>> reproduce it at the tip of the release-2.2.0 branch.
>>>
>>> On Wed, Nov 15, 2017 at 5:34 AM Reuven Lax 
>>> wrote:
>>>
>>> > I'm trying to do the last CP and cut RC4, but I'm getting a compilation
>>> > failure in Python - "ImportError: No module named site"
>>> >
>>> > Did we possibly break the release branch on one of the Python CPs?
>>> >
>>> > Reuven
>>> >
>>> > On Sun, Nov 12, 2017 at 5:12 PM, Jean-Baptiste Onofré >> >
>>> > wrote:
>>> >
>>> > > Hi Reuven,
>>> > >
>>> > > +1 for RC4, and don't worry: it's part of the process. I prefer to
>>> have a
>>> > > long release process than a crappy a release ;) That's exactly the
>>> > purpose
>>> > > of review & vote.
>>> > >
>>> > > I definitely think that having releases more often will reduce such
>>> kind
>>> > > of issue.
>>> > >
>>> > > Regards
>>> > > JB
>>> > >
>>> > >
>>> > > On 11/12/2017 09:04 AM, Reuven Lax wrote:
>>> > >
>>> > >> I definitely appreciate the frustration about how long this release
>>> is
>>> > >> taking. It's verging on the point of ridiculous at this point, and
>>> we
>>> > need
>>> > >> to fix some of the things that caused us to get to this state (for
>>> one
>>> > >> thing our 

Jenkins build is back to stable : beam_Release_NightlySnapshot #595

2017-11-16 Thread Apache Jenkins Server
See