Re: KafkaIO Exactly-Once & Flink Runner

2019-03-01 Thread Maximilian Michels
Thanks you for the prompt replies. It's great to see that there is good 
understanding of how EOS in Flink works.


This is exactly what RequiresStableInput is supposed to do. On the Flink runner, this would be implemented by delaying processing until the current checkpoint is done. 


I don't think that works because we have no control over the Kafka 
transactions. Imagine:


1) ExactlyOnceWriter writes records to Kafka and commits, then starts a 
new transaction.
2) Flink checkpoints, delaying the processing of elements, the 
checkpoint fails.
3) We restore from an old checkpoint and will start writing duplicate 
data to Kafka. The de-duplication that the sink performs does not help, 
especially because the random shards ids might be assigned differently.


IMHO we have to have control over commit to be able to provide EOS.

When we discussed this in Aug 2017, the understanding was that 2 Phase commit utility in Flink used to implement Flink's Kafka EOS could not be implemented in Beam's context. 


That's also my understanding, unless we change the interface.


I don't see how SDF solves this problem..


SDF has a checkpoint method which the Runner can call, but I think that 
you are right, that the above problem would be the same.



Absolutely. I would love to support EOS in KakaIO for Flink. I think that will 
help many future exactly-once sinks.. and address fundamental incompatibility 
between Beam model and Flink's horizontal checkpointing for such applications.


Great :)


The FlinkRunner would need to insert the "wait until checkpoint finalization" 
logic wherever it sees @RequiresStableInput, which is already what it would have to do.


I don't think that fixes the problem. See above example.

Thanks,
Max

On 01.03.19 00:04, Raghu Angadi wrote:



On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi > wrote:


On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles mailto:k...@apache.org>> wrote:

I'm not sure what a hard fail is. I probably have a shallow
understanding, but doesn't @RequiresStableInput work for 2PC?
The preCommit() phase should establish the transaction and
commit() is not called until after checkpoint finalization. Can
you describe the way that it does not work a little bit more?


- preCommit() is called before checkpoint. Kafka EOS in Flink starts
the transaction before this and makes sure it flushes all records in
preCommit(). So far good.
- commit is called after checkpoint is persisted. Now, imagine
commit() fails for some reason. There is no option to rerun the 1st
phase to write the records again in a new transaction. This is a
hard failure for the the job. In practice Flink might attempt to
commit again (not sure how many times), which is likely to fail and
eventually results in job failure.


In Apache Beam, the records could be stored in state, and can be written 
inside commit() to work around this issue. It could have scalability 
issues if checkpoints are not frequent enough in Flink runner.


Raghu.



Kenn

On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi mailto:ang...@gmail.com>> wrote:

On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles
mailto:k...@apache.org>> wrote:

I believe the way you would implement the logic behind
Flink's KafkaProducer would be to have two steps:

1. Start transaction
2. @RequiresStableInput Close transaction


I see.  What happens if closing the transaction fails in
(2)? Flink's 2PC requires that commit() should never hard
fail once preCommit() succeeds. I think that is cost of not
having an extra shuffle. It is alright since this policy has
worked well for Flink so far.

Overall, it will be great to have @RequiresStableInput
support in Flink runner.

Raghu.

The FlinkRunner would need to insert the "wait until
checkpoint finalization" logic wherever it
sees @RequiresStableInput, which is already what it
would have to do.

This matches the KafkaProducer's logic - delay closing
the transaction until checkpoint finalization. This
answers my main question, which is "is
@RequiresStableInput expressive enough to allow
Beam-on-Flink to have exactly once behavior with the
same performance characteristics as native Flink
checkpoint finalization?"

Kenn

[1] https://github.com/apache/beam/pull/7955

On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax
mailto:re...@google.com>> wrote:



On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi
mailto:ang...@gmail.com>> wrote:


Now

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-01 Thread Maximilian Michels
Just realized that transactions do not spawn multiple elements in 
KafkaExactlyOnceSink. So the proposed solution to stop processing 
elements while a snapshot is pending would work.


It is certainly not optimal in terms of performance for Flink and poses 
problems when checkpoints take long to complete, but it would be 
worthwhile to implement this to make use of the EOS feature.


Thanks,
Max

On 01.03.19 12:23, Maximilian Michels wrote:
Thanks you for the prompt replies. It's great to see that there is good 
understanding of how EOS in Flink works.


This is exactly what RequiresStableInput is supposed to do. On the 
Flink runner, this would be implemented by delaying processing until 
the current checkpoint is done. 


I don't think that works because we have no control over the Kafka 
transactions. Imagine:


1) ExactlyOnceWriter writes records to Kafka and commits, then starts a 
new transaction.
2) Flink checkpoints, delaying the processing of elements, the 
checkpoint fails.
3) We restore from an old checkpoint and will start writing duplicate 
data to Kafka. The de-duplication that the sink performs does not help, 
especially because the random shards ids might be assigned differently.


IMHO we have to have control over commit to be able to provide EOS.

When we discussed this in Aug 2017, the understanding was that 2 Phase 
commit utility in Flink used to implement Flink's Kafka EOS could not 
be implemented in Beam's context. 


That's also my understanding, unless we change the interface.


I don't see how SDF solves this problem..


SDF has a checkpoint method which the Runner can call, but I think that 
you are right, that the above problem would be the same.


Absolutely. I would love to support EOS in KakaIO for Flink. I think 
that will help many future exactly-once sinks.. and address 
fundamental incompatibility between Beam model and Flink's horizontal 
checkpointing for such applications.


Great :)

The FlinkRunner would need to insert the "wait until checkpoint 
finalization" logic wherever it sees @RequiresStableInput, which is 
already what it would have to do.


I don't think that fixes the problem. See above example.

Thanks,
Max

On 01.03.19 00:04, Raghu Angadi wrote:



On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi > wrote:


    On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles mailto:k...@apache.org>> wrote:

    I'm not sure what a hard fail is. I probably have a shallow
    understanding, but doesn't @RequiresStableInput work for 2PC?
    The preCommit() phase should establish the transaction and
    commit() is not called until after checkpoint finalization. Can
    you describe the way that it does not work a little bit more?


    - preCommit() is called before checkpoint. Kafka EOS in Flink starts
    the transaction before this and makes sure it flushes all records in
    preCommit(). So far good.
    - commit is called after checkpoint is persisted. Now, imagine
    commit() fails for some reason. There is no option to rerun the 1st
    phase to write the records again in a new transaction. This is a
    hard failure for the the job. In practice Flink might attempt to
    commit again (not sure how many times), which is likely to fail and
    eventually results in job failure.


In Apache Beam, the records could be stored in state, and can be 
written inside commit() to work around this issue. It could have 
scalability issues if checkpoints are not frequent enough in Flink 
runner.


Raghu.



    Kenn

    On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi mailto:ang...@gmail.com>> wrote:

    On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles
    mailto:k...@apache.org>> wrote:

    I believe the way you would implement the logic behind
    Flink's KafkaProducer would be to have two steps:

    1. Start transaction
    2. @RequiresStableInput Close transaction


    I see.  What happens if closing the transaction fails in
    (2)? Flink's 2PC requires that commit() should never hard
    fail once preCommit() succeeds. I think that is cost of not
    having an extra shuffle. It is alright since this policy has
    worked well for Flink so far.

    Overall, it will be great to have @RequiresStableInput
    support in Flink runner.

    Raghu.

    The FlinkRunner would need to insert the "wait until
    checkpoint finalization" logic wherever it
    sees @RequiresStableInput, which is already what it
    would have to do.

    This matches the KafkaProducer's logic - delay closing
    the transaction until checkpoint finalization. This
    answers my main question, which is "is
    @RequiresStableInput expressive enough to allow
    Beam-on-Flink to have exactly once behavior with the
  

Re: [VOTE] Release 2.11.0, release candidate #2

2019-03-01 Thread Ahmet Altay
+1 (binding)

On Tue, Feb 26, 2019 at 9:18 PM Kenneth Knowles  wrote:

> +1 (binding)
>
> Ran run_rc_validation.sh and said "yes" to everything up to one mobile
> gaming example. I did not get to the Python tests towards the end of the
> script (it is the same as last RC anyhow? I did not verify this)
>
> On Tue, Feb 26, 2019 at 2:16 PM Charles Chen  wrote:
>
>> Thank you, +1.  I tested Python 3 support in batch and streaming mode
>> (using wordcount and streaming wordcount) on both DirectRunner and
>> DataflowRunner.
>>
>> On Tue, Feb 26, 2019 at 7:54 AM Konstantinos Katsiapis <
>> katsia...@google.com> wrote:
>>
>>> +1.
>>> (Same rational as my earlier post for RC1).
>>>
>>> On Tue, Feb 26, 2019 at 2:19 AM Maximilian Michels 
>>> wrote:
>>>
 +1 (binding)

 * Verified checksums
 * Ran quickstart WordCount tests local/cluster with the Flink Runner

 -Max

 On 26.02.19 10:40, Ahmet Altay wrote:
 > Hi everyone,
 >
 > Please review and vote on the release candidate #2 for the version
 > 2.11.0, as follows:
 >
 > [ ] +1, Approve the release
 > [ ] -1, Do not approve the release (please provide specific comments)
 >
 > The complete staging area is available for your review, which
 includes:
 > * JIRA release notes [1],
 > * the official Apache source release to be deployed to
 dist.apache.org
 >  [2], which is signed with the key with
 > fingerprint 64B84A5AD91F9C20F5E9D9A7D62E71416096FA00 [3],
 > * all artifacts to be deployed to the Maven Central Repository [4],
 > * source code tag "v2.11.0-RC2" [5],
 > * website pull request listing the release [6] and publishing the API
 > reference manual [7].
 > * Python artifacts are deployed along with the source release to the
 > dist.apache.org  [2].
 > * Validation sheet with a tab for 2.11.0 release to help with
 validation
 > [8].
 >
 > The vote will be open for at least 72 hours. It is adopted by
 majority
 > approval, with at least 3 PMC affirmative votes.
 >
 > Thanks,
 > Ahmet
 >
 > [1]
 >
 https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527&version=12344775
 > [2] https://dist.apache.org/repos/dist/dev/beam/2.11.0/
 > [3] https://dist.apache.org/repos/dist/release/beam/KEYS
 > [4]
 https://repository.apache.org/content/repositories/orgapachebeam-1064/
 > [5] https://github.com/apache/beam/tree/v2.11.0-RC2
 > [6] https://github.com/apache/beam/pull/7924
 > [7] https://github.com/apache/beam-site/pull/587
 > [8]
 >
 https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=542393513
 >

>>>
>>>
>>> --
>>> Gus Katsiapis | Software Engineer | katsia...@google.com | 650-918-7487
>>>
>>


Re: KafkaIO Exactly-Once & Flink Runner

2019-03-01 Thread Maximilian Michels
Circling back to the RequiresStableInput annotation[1]. I've done some 
protoyping to see how this could be integrated into Flink. I'm currently 
writing a test based on RequiresStableInput.


I found out there are already checks in place at the Runners to throw in 
case transforms use RequiresStableInput and its not supported. However, 
not a single transform actually uses the annotation.


It seems that the effort stopped at some point? Would it make sense to 
start annotating KafkaExactlyOnceSink with @RequiresStableInput? We 
could then get rid of the whitelist.


-Max

[1] 
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM 



On 01.03.19 14:28, Maximilian Michels wrote:
Just realized that transactions do not spawn multiple elements in 
KafkaExactlyOnceSink. So the proposed solution to stop processing 
elements while a snapshot is pending would work.


It is certainly not optimal in terms of performance for Flink and poses 
problems when checkpoints take long to complete, but it would be 
worthwhile to implement this to make use of the EOS feature.


Thanks,
Max

On 01.03.19 12:23, Maximilian Michels wrote:
Thanks you for the prompt replies. It's great to see that there is 
good understanding of how EOS in Flink works.


This is exactly what RequiresStableInput is supposed to do. On the 
Flink runner, this would be implemented by delaying processing until 
the current checkpoint is done. 


I don't think that works because we have no control over the Kafka 
transactions. Imagine:


1) ExactlyOnceWriter writes records to Kafka and commits, then starts 
a new transaction.
2) Flink checkpoints, delaying the processing of elements, the 
checkpoint fails.
3) We restore from an old checkpoint and will start writing duplicate 
data to Kafka. The de-duplication that the sink performs does not 
help, especially because the random shards ids might be assigned 
differently.


IMHO we have to have control over commit to be able to provide EOS.

When we discussed this in Aug 2017, the understanding was that 2 
Phase commit utility in Flink used to implement Flink's Kafka EOS 
could not be implemented in Beam's context. 


That's also my understanding, unless we change the interface.


I don't see how SDF solves this problem..


SDF has a checkpoint method which the Runner can call, but I think 
that you are right, that the above problem would be the same.


Absolutely. I would love to support EOS in KakaIO for Flink. I think 
that will help many future exactly-once sinks.. and address 
fundamental incompatibility between Beam model and Flink's horizontal 
checkpointing for such applications.


Great :)

The FlinkRunner would need to insert the "wait until checkpoint 
finalization" logic wherever it sees @RequiresStableInput, which is 
already what it would have to do.


I don't think that fixes the problem. See above example.

Thanks,
Max

On 01.03.19 00:04, Raghu Angadi wrote:



On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi > wrote:


    On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles mailto:k...@apache.org>> wrote:

    I'm not sure what a hard fail is. I probably have a shallow
    understanding, but doesn't @RequiresStableInput work for 2PC?
    The preCommit() phase should establish the transaction and
    commit() is not called until after checkpoint finalization. Can
    you describe the way that it does not work a little bit more?


    - preCommit() is called before checkpoint. Kafka EOS in Flink starts
    the transaction before this and makes sure it flushes all records in
    preCommit(). So far good.
    - commit is called after checkpoint is persisted. Now, imagine
    commit() fails for some reason. There is no option to rerun the 1st
    phase to write the records again in a new transaction. This is a
    hard failure for the the job. In practice Flink might attempt to
    commit again (not sure how many times), which is likely to fail and
    eventually results in job failure.


In Apache Beam, the records could be stored in state, and can be 
written inside commit() to work around this issue. It could have 
scalability issues if checkpoints are not frequent enough in Flink 
runner.


Raghu.



    Kenn

    On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi mailto:ang...@gmail.com>> wrote:

    On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles
    mailto:k...@apache.org>> wrote:

    I believe the way you would implement the logic behind
    Flink's KafkaProducer would be to have two steps:

    1. Start transaction
    2. @RequiresStableInput Close transaction


    I see.  What happens if closing the transaction fails in
    (2)? Flink's 2PC requires that commit() should never hard
    fail once preCommit() succeeds. I think that is cost of not
    having an extra shuffle. It is alright since this policy has
    work

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-01 Thread Reuven Lax
Are you suggesting that RequiresStableInput doesn't work with Flink in
general? Or are your concerns specific to the Kafka connector?

RequiresStableInput was originally motivated by other sinks like TextIO,
where you also want to delay creating the final output files until you are
sure their content is stable. Another example is the BigQuery streaming
sink. That sink generates random record ids to send to BigQuery to ensure
exactly-once inserts (BigQuery uses this id for deduplication). If records
are replayed into the sink after data has already been written to BigQuery,
then the sink will generate new random record ids, so the duplicate output
will not be detected. These were the original motivations for
RequiresStableInput, to tell the runner that the input to the transform
must be "stable" before proceeding. In Dataflow a Reshuffle does this, as
Dataflow's shuffle is persistent. For other runners, different techniques
will be needed.

BTW the Reshuffle currently in KafkaIO possibly should be removed  and
replaced with a RequiresStableInput annotation once that is implemented.

Reuven

On Fri, Mar 1, 2019 at 3:30 AM Maximilian Michels  wrote:

> Thanks you for the prompt replies. It's great to see that there is good
> understanding of how EOS in Flink works.
>
> > This is exactly what RequiresStableInput is supposed to do. On the Flink
> runner, this would be implemented by delaying processing until the current
> checkpoint is done.
>
> I don't think that works because we have no control over the Kafka
> transactions. Imagine:
>
> 1) ExactlyOnceWriter writes records to Kafka and commits, then starts a
> new transaction.
> 2) Flink checkpoints, delaying the processing of elements, the
> checkpoint fails.
> 3) We restore from an old checkpoint and will start writing duplicate
> data to Kafka. The de-duplication that the sink performs does not help,
> especially because the random shards ids might be assigned differently.
>
> IMHO we have to have control over commit to be able to provide EOS.
>
> > When we discussed this in Aug 2017, the understanding was that 2 Phase
> commit utility in Flink used to implement Flink's Kafka EOS could not be
> implemented in Beam's context.
>
> That's also my understanding, unless we change the interface.
>
> > I don't see how SDF solves this problem..
>
> SDF has a checkpoint method which the Runner can call, but I think that
> you are right, that the above problem would be the same.
>
> > Absolutely. I would love to support EOS in KakaIO for Flink. I think
> that will help many future exactly-once sinks.. and address fundamental
> incompatibility between Beam model and Flink's horizontal checkpointing for
> such applications.
>
> Great :)
>
> > The FlinkRunner would need to insert the "wait until checkpoint
> finalization" logic wherever it sees @RequiresStableInput, which is already
> what it would have to do.
>
> I don't think that fixes the problem. See above example.
>
> Thanks,
> Max
>
> On 01.03.19 00:04, Raghu Angadi wrote:
> >
> >
> > On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi  > > wrote:
> >
> > On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles  > > wrote:
> >
> > I'm not sure what a hard fail is. I probably have a shallow
> > understanding, but doesn't @RequiresStableInput work for 2PC?
> > The preCommit() phase should establish the transaction and
> > commit() is not called until after checkpoint finalization. Can
> > you describe the way that it does not work a little bit more?
> >
> >
> > - preCommit() is called before checkpoint. Kafka EOS in Flink starts
> > the transaction before this and makes sure it flushes all records in
> > preCommit(). So far good.
> > - commit is called after checkpoint is persisted. Now, imagine
> > commit() fails for some reason. There is no option to rerun the 1st
> > phase to write the records again in a new transaction. This is a
> > hard failure for the the job. In practice Flink might attempt to
> > commit again (not sure how many times), which is likely to fail and
> > eventually results in job failure.
> >
> >
> > In Apache Beam, the records could be stored in state, and can be written
> > inside commit() to work around this issue. It could have scalability
> > issues if checkpoints are not frequent enough in Flink runner.
> >
> > Raghu.
> >
> >
> >
> > Kenn
> >
> > On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi  > > wrote:
> >
> > On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles
> > mailto:k...@apache.org>> wrote:
> >
> > I believe the way you would implement the logic behind
> > Flink's KafkaProducer would be to have two steps:
> >
> > 1. Start transaction
> > 2. @RequiresStableInput Close transaction
> >
> >
> > I see.  What happens if closing 

Re: [VOTE] Release 2.11.0, release candidate #2

2019-03-01 Thread Ahmet Altay
Thank you everyone for voting.

The vote has passed with 5 supportive +1 votes, 4 of which are binding PMC
votes:

* Maximilian Michels
* Reuven Lax
* Kenneth Knowles
* Ahmet Altay

I will proceed with release finalization steps.

Ahmet

On Fri, Mar 1, 2019 at 6:59 AM Ahmet Altay  wrote:

> +1 (binding)
>
> On Tue, Feb 26, 2019 at 9:18 PM Kenneth Knowles  wrote:
>
>> +1 (binding)
>>
>> Ran run_rc_validation.sh and said "yes" to everything up to one mobile
>> gaming example. I did not get to the Python tests towards the end of the
>> script (it is the same as last RC anyhow? I did not verify this)
>>
>> On Tue, Feb 26, 2019 at 2:16 PM Charles Chen  wrote:
>>
>>> Thank you, +1.  I tested Python 3 support in batch and streaming mode
>>> (using wordcount and streaming wordcount) on both DirectRunner and
>>> DataflowRunner.
>>>
>>> On Tue, Feb 26, 2019 at 7:54 AM Konstantinos Katsiapis <
>>> katsia...@google.com> wrote:
>>>
 +1.
 (Same rational as my earlier post for RC1).

 On Tue, Feb 26, 2019 at 2:19 AM Maximilian Michels 
 wrote:

> +1 (binding)
>
> * Verified checksums
> * Ran quickstart WordCount tests local/cluster with the Flink Runner
>
> -Max
>
> On 26.02.19 10:40, Ahmet Altay wrote:
> > Hi everyone,
> >
> > Please review and vote on the release candidate #2 for the version
> > 2.11.0, as follows:
> >
> > [ ] +1, Approve the release
> > [ ] -1, Do not approve the release (please provide specific comments)
> >
> > The complete staging area is available for your review, which
> includes:
> > * JIRA release notes [1],
> > * the official Apache source release to be deployed to
> dist.apache.org
> >  [2], which is signed with the key with
> > fingerprint 64B84A5AD91F9C20F5E9D9A7D62E71416096FA00 [3],
> > * all artifacts to be deployed to the Maven Central Repository [4],
> > * source code tag "v2.11.0-RC2" [5],
> > * website pull request listing the release [6] and publishing the
> API
> > reference manual [7].
> > * Python artifacts are deployed along with the source release to the
> > dist.apache.org  [2].
> > * Validation sheet with a tab for 2.11.0 release to help with
> validation
> > [8].
> >
> > The vote will be open for at least 72 hours. It is adopted by
> majority
> > approval, with at least 3 PMC affirmative votes.
> >
> > Thanks,
> > Ahmet
> >
> > [1]
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527&version=12344775
> > [2] https://dist.apache.org/repos/dist/dev/beam/2.11.0/
> > [3] https://dist.apache.org/repos/dist/release/beam/KEYS
> > [4]
> https://repository.apache.org/content/repositories/orgapachebeam-1064/
> > [5] https://github.com/apache/beam/tree/v2.11.0-RC2
> > [6] https://github.com/apache/beam/pull/7924
> > [7] https://github.com/apache/beam-site/pull/587
> > [8]
> >
> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=542393513
> >
>


 --
 Gus Katsiapis | Software Engineer | katsia...@google.com | 650-918-7487

>>>


Re: [VOTE] Release 2.11.0, release candidate #2

2019-03-01 Thread Connell O'Callaghan
Excellent thank you Ahmet and all involved!!!

On Fri, Mar 1, 2019, 7:23 AM Ahmet Altay  wrote:

> Thank you everyone for voting.
>
> The vote has passed with 5 supportive +1 votes, 4 of which are binding PMC
> votes:
>
> * Maximilian Michels
> * Reuven Lax
> * Kenneth Knowles
> * Ahmet Altay
>
> I will proceed with release finalization steps.
>
> Ahmet
>
> On Fri, Mar 1, 2019 at 6:59 AM Ahmet Altay  wrote:
>
>> +1 (binding)
>>
>> On Tue, Feb 26, 2019 at 9:18 PM Kenneth Knowles  wrote:
>>
>>> +1 (binding)
>>>
>>> Ran run_rc_validation.sh and said "yes" to everything up to one mobile
>>> gaming example. I did not get to the Python tests towards the end of the
>>> script (it is the same as last RC anyhow? I did not verify this)
>>>
>>> On Tue, Feb 26, 2019 at 2:16 PM Charles Chen  wrote:
>>>
 Thank you, +1.  I tested Python 3 support in batch and streaming mode
 (using wordcount and streaming wordcount) on both DirectRunner and
 DataflowRunner.

 On Tue, Feb 26, 2019 at 7:54 AM Konstantinos Katsiapis <
 katsia...@google.com> wrote:

> +1.
> (Same rational as my earlier post for RC1).
>
> On Tue, Feb 26, 2019 at 2:19 AM Maximilian Michels 
> wrote:
>
>> +1 (binding)
>>
>> * Verified checksums
>> * Ran quickstart WordCount tests local/cluster with the Flink Runner
>>
>> -Max
>>
>> On 26.02.19 10:40, Ahmet Altay wrote:
>> > Hi everyone,
>> >
>> > Please review and vote on the release candidate #2 for the version
>> > 2.11.0, as follows:
>> >
>> > [ ] +1, Approve the release
>> > [ ] -1, Do not approve the release (please provide specific
>> comments)
>> >
>> > The complete staging area is available for your review, which
>> includes:
>> > * JIRA release notes [1],
>> > * the official Apache source release to be deployed to
>> dist.apache.org
>> >  [2], which is signed with the key with
>> > fingerprint 64B84A5AD91F9C20F5E9D9A7D62E71416096FA00 [3],
>> > * all artifacts to be deployed to the Maven Central Repository [4],
>> > * source code tag "v2.11.0-RC2" [5],
>> > * website pull request listing the release [6] and publishing the
>> API
>> > reference manual [7].
>> > * Python artifacts are deployed along with the source release to
>> the
>> > dist.apache.org  [2].
>> > * Validation sheet with a tab for 2.11.0 release to help with
>> validation
>> > [8].
>> >
>> > The vote will be open for at least 72 hours. It is adopted by
>> majority
>> > approval, with at least 3 PMC affirmative votes.
>> >
>> > Thanks,
>> > Ahmet
>> >
>> > [1]
>> >
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527&version=12344775
>> > [2] https://dist.apache.org/repos/dist/dev/beam/2.11.0/
>> > [3] https://dist.apache.org/repos/dist/release/beam/KEYS
>> > [4]
>> https://repository.apache.org/content/repositories/orgapachebeam-1064/
>> > [5] https://github.com/apache/beam/tree/v2.11.0-RC2
>> > [6] https://github.com/apache/beam/pull/7924
>> > [7] https://github.com/apache/beam-site/pull/587
>> > [8]
>> >
>> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=542393513
>> >
>>
>
>
> --
> Gus Katsiapis | Software Engineer | katsia...@google.com |
>  650-918-7487
>



Re: KafkaIO Exactly-Once & Flink Runner

2019-03-01 Thread Reuven Lax
Yeah, the person who was working on it originally stopped working on Beam,
and nobody else ever finished it. I think it is important to finish though.
Many of the existing Sinks are only fully correct for Dataflow today,
because they generate either Reshuffle or GroupByKey to ensure input
stability before outputting (in many cases this code was inherited from
before Beam existed). On Flink today, these sinks might occasionally
produce duplicate output in the case of failures.

Reuven

On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels  wrote:

> Circling back to the RequiresStableInput annotation[1]. I've done some
> protoyping to see how this could be integrated into Flink. I'm currently
> writing a test based on RequiresStableInput.
>
> I found out there are already checks in place at the Runners to throw in
> case transforms use RequiresStableInput and its not supported. However,
> not a single transform actually uses the annotation.
>
> It seems that the effort stopped at some point? Would it make sense to
> start annotating KafkaExactlyOnceSink with @RequiresStableInput? We
> could then get rid of the whitelist.
>
> -Max
>
> [1]
>
> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
>
>
> On 01.03.19 14:28, Maximilian Michels wrote:
> > Just realized that transactions do not spawn multiple elements in
> > KafkaExactlyOnceSink. So the proposed solution to stop processing
> > elements while a snapshot is pending would work.
> >
> > It is certainly not optimal in terms of performance for Flink and poses
> > problems when checkpoints take long to complete, but it would be
> > worthwhile to implement this to make use of the EOS feature.
> >
> > Thanks,
> > Max
> >
> > On 01.03.19 12:23, Maximilian Michels wrote:
> >> Thanks you for the prompt replies. It's great to see that there is
> >> good understanding of how EOS in Flink works.
> >>
> >>> This is exactly what RequiresStableInput is supposed to do. On the
> >>> Flink runner, this would be implemented by delaying processing until
> >>> the current checkpoint is done.
> >>
> >> I don't think that works because we have no control over the Kafka
> >> transactions. Imagine:
> >>
> >> 1) ExactlyOnceWriter writes records to Kafka and commits, then starts
> >> a new transaction.
> >> 2) Flink checkpoints, delaying the processing of elements, the
> >> checkpoint fails.
> >> 3) We restore from an old checkpoint and will start writing duplicate
> >> data to Kafka. The de-duplication that the sink performs does not
> >> help, especially because the random shards ids might be assigned
> >> differently.
> >>
> >> IMHO we have to have control over commit to be able to provide EOS.
> >>
> >>> When we discussed this in Aug 2017, the understanding was that 2
> >>> Phase commit utility in Flink used to implement Flink's Kafka EOS
> >>> could not be implemented in Beam's context.
> >>
> >> That's also my understanding, unless we change the interface.
> >>
> >>> I don't see how SDF solves this problem..
> >>
> >> SDF has a checkpoint method which the Runner can call, but I think
> >> that you are right, that the above problem would be the same.
> >>
> >>> Absolutely. I would love to support EOS in KakaIO for Flink. I think
> >>> that will help many future exactly-once sinks.. and address
> >>> fundamental incompatibility between Beam model and Flink's horizontal
> >>> checkpointing for such applications.
> >>
> >> Great :)
> >>
> >>> The FlinkRunner would need to insert the "wait until checkpoint
> >>> finalization" logic wherever it sees @RequiresStableInput, which is
> >>> already what it would have to do.
> >>
> >> I don't think that fixes the problem. See above example.
> >>
> >> Thanks,
> >> Max
> >>
> >> On 01.03.19 00:04, Raghu Angadi wrote:
> >>>
> >>>
> >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi  >>> > wrote:
> >>>
> >>> On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles  >>> > wrote:
> >>>
> >>> I'm not sure what a hard fail is. I probably have a shallow
> >>> understanding, but doesn't @RequiresStableInput work for 2PC?
> >>> The preCommit() phase should establish the transaction and
> >>> commit() is not called until after checkpoint finalization. Can
> >>> you describe the way that it does not work a little bit more?
> >>>
> >>>
> >>> - preCommit() is called before checkpoint. Kafka EOS in Flink
> starts
> >>> the transaction before this and makes sure it flushes all records
> in
> >>> preCommit(). So far good.
> >>> - commit is called after checkpoint is persisted. Now, imagine
> >>> commit() fails for some reason. There is no option to rerun the 1st
> >>> phase to write the records again in a new transaction. This is a
> >>> hard failure for the the job. In practice Flink might attempt to
> >>> commit again (not sure how many times), which is likely to fail and
> >>> eventually results in job fail

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-01 Thread Maximilian Michels
Fully agree. I think we can improve the situation drastically. For 
KafkaIO EOS with Flink we need to make these two changes:


1) Introduce buffering while the checkpoint is being taken
2) Replace the random shard id assignment with something deterministic

However, we won't be able to provide full compatibility with 
RequiresStableInput because Flink only guarantees stable input after a 
checkpoint. RequiresStableInput requires input at any point in time to 
be stable. IMHO the only way to achieve that is materializing output 
which Flink does not currently support.


KafkaIO does not need all the power of RequiresStableInput to achieve 
EOS with Flink, but for the general case I don't see a good solution at 
the moment.


-Max

On 01.03.19 16:45, Reuven Lax wrote:
Yeah, the person who was working on it originally stopped working on 
Beam, and nobody else ever finished it. I think it is important to 
finish though. Many of the existing Sinks are only fully correct for 
Dataflow today, because they generate either Reshuffle or GroupByKey to 
ensure input stability before outputting (in many cases this code was 
inherited from before Beam existed). On Flink today, these sinks might 
occasionally produce duplicate output in the case of failures.


Reuven

On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels > wrote:


Circling back to the RequiresStableInput annotation[1]. I've done some
protoyping to see how this could be integrated into Flink. I'm
currently
writing a test based on RequiresStableInput.

I found out there are already checks in place at the Runners to
throw in
case transforms use RequiresStableInput and its not supported. However,
not a single transform actually uses the annotation.

It seems that the effort stopped at some point? Would it make sense to
start annotating KafkaExactlyOnceSink with @RequiresStableInput? We
could then get rid of the whitelist.

-Max

[1]

https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM



On 01.03.19 14:28, Maximilian Michels wrote:
 > Just realized that transactions do not spawn multiple elements in
 > KafkaExactlyOnceSink. So the proposed solution to stop processing
 > elements while a snapshot is pending would work.
 >
 > It is certainly not optimal in terms of performance for Flink and
poses
 > problems when checkpoints take long to complete, but it would be
 > worthwhile to implement this to make use of the EOS feature.
 >
 > Thanks,
 > Max
 >
 > On 01.03.19 12:23, Maximilian Michels wrote:
 >> Thanks you for the prompt replies. It's great to see that there is
 >> good understanding of how EOS in Flink works.
 >>
 >>> This is exactly what RequiresStableInput is supposed to do. On the
 >>> Flink runner, this would be implemented by delaying processing
until
 >>> the current checkpoint is done.
 >>
 >> I don't think that works because we have no control over the Kafka
 >> transactions. Imagine:
 >>
 >> 1) ExactlyOnceWriter writes records to Kafka and commits, then
starts
 >> a new transaction.
 >> 2) Flink checkpoints, delaying the processing of elements, the
 >> checkpoint fails.
 >> 3) We restore from an old checkpoint and will start writing
duplicate
 >> data to Kafka. The de-duplication that the sink performs does not
 >> help, especially because the random shards ids might be assigned
 >> differently.
 >>
 >> IMHO we have to have control over commit to be able to provide EOS.
 >>
 >>> When we discussed this in Aug 2017, the understanding was that 2
 >>> Phase commit utility in Flink used to implement Flink's Kafka EOS
 >>> could not be implemented in Beam's context.
 >>
 >> That's also my understanding, unless we change the interface.
 >>
 >>> I don't see how SDF solves this problem..
 >>
 >> SDF has a checkpoint method which the Runner can call, but I think
 >> that you are right, that the above problem would be the same.
 >>
 >>> Absolutely. I would love to support EOS in KakaIO for Flink. I
think
 >>> that will help many future exactly-once sinks.. and address
 >>> fundamental incompatibility between Beam model and Flink's
horizontal
 >>> checkpointing for such applications.
 >>
 >> Great :)
 >>
 >>> The FlinkRunner would need to insert the "wait until checkpoint
 >>> finalization" logic wherever it sees @RequiresStableInput,
which is
 >>> already what it would have to do.
 >>
 >> I don't think that fixes the problem. See above example.
 >>
 >> Thanks,
 >> Max
 >>
 >> On 01.03.19 00:04, Raghu Angadi wrote:
 >>>
 >>>
 >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi mailto:ang...@gmail.com>
 >>> >> wrote:

Re: Added a Jira beginner's guide to the wiki.

2019-03-01 Thread Daniel Oliveira
That's really useful Udi. I'm not sure if it would fit the beginner's guide
but it's definitely worth writing down, so I made a "Jira Tips" page and
added it there:
https://cwiki.apache.org/confluence/display/BEAM/Jira+Tips

On Wed, Feb 27, 2019 at 7:40 PM Kenneth Knowles  wrote:

> Genius. I love it. This will save me so much clicking time.
>
> Kenn
>
> On Wed, Feb 27, 2019 at 5:20 PM Udi Meiri  wrote:
>
>> My favorite way to navigate JIRA is using a Chrome search engine.
>> You configure it like this:
>> [image: Screenshot from 2019-02-27 17-11-26.png]
>> (URL is:
>> https://issues.apache.org/jira/secure/QuickSearch.jspa?searchString=%s)
>>
>> And search by writing in the location bar:
>> "j BEAM-1234" will take you to that specific issue
>> "j beam unresolved udim" will show all unresolved issues assigned to udim
>>
>>
>> On Tue, Feb 26, 2019 at 9:22 PM Ahmet Altay  wrote:
>>
>>> Thank you Daniel, this is great information.
>>>
>>> On Fri, Feb 22, 2019 at 11:47 AM Daniel Oliveira 
>>> wrote:
>>>
 Hi everyone,

 In a recent thread in this list I mentioned that it might be nice to
 have a short guide for our Jira on the wiki since there were some aspects
 of Jira that I found a bit unintuitive or not discover-able when I was
 getting into the project. I went ahead and wrote one up and would
 appreciate some feedback, especially from any contributors that may be new
 to Beam and/or Jira.


 https://cwiki.apache.org/confluence/display/BEAM/Beam+Jira+Beginner%27s+Guide

 The main two aspects that I want to make sure I got right are:

 1. Covering details that are often confusing for new contributors, such
 as ways Beam uses Jira that might be unique, or just unintuitive features.

 2. Keeping it very brief and duplicating as little documentation as
 possible. I don't want this to get outdated, so I'd much rather link to a
 source of truth when possible.

 If anyone has any details I missed that they'd like to add, or feel
 that they could edit the guide a bit to keep it brief and cut out
 unnecessary info, please go ahead. Also, I'm hoping that this guide could
 be linked from the Contribution Guide
  on the website if people find it
 useful, so feedback on that front would be great too.

>>>


Re: KafkaIO Exactly-Once & Flink Runner

2019-03-01 Thread Reuven Lax
On Fri, Mar 1, 2019 at 10:37 AM Maximilian Michels  wrote:

> Fully agree. I think we can improve the situation drastically. For
> KafkaIO EOS with Flink we need to make these two changes:
>
> 1) Introduce buffering while the checkpoint is being taken
> 2) Replace the random shard id assignment with something deterministic
>

Can we do 2? I seem to remember that we had trouble in some cases (e..g in
the BigQuery case, there was no obvious way to create a deterministic id,
which is why we went for a random number followed by a reshuffle). Also
remember that the user ParDo that is producing data to the sink is not
guaranteed to be deterministic; the Beam model allows for non-deterministic
transforms.


>
> However, we won't be able to provide full compatibility with
> RequiresStableInput because Flink only guarantees stable input after a
> checkpoint. RequiresStableInput requires input at any point in time to
> be stable.


I'm not quite sure I understand. If a ParDo is marked with
RequiresStableInput, can't the flink runner buffer the input message until
after the checkpoint is complete and only then deliver it to the ParDo?
This adds latency of course, but I'm not sure how else to do things
correctly with the Beam model.


> IMHO the only way to achieve that is materializing output
> which Flink does not currently support.
>
> KafkaIO does not need all the power of RequiresStableInput to achieve
> EOS with Flink, but for the general case I don't see a good solution at
> the moment.
>
> -Max
>
> On 01.03.19 16:45, Reuven Lax wrote:
> > Yeah, the person who was working on it originally stopped working on
> > Beam, and nobody else ever finished it. I think it is important to
> > finish though. Many of the existing Sinks are only fully correct for
> > Dataflow today, because they generate either Reshuffle or GroupByKey to
> > ensure input stability before outputting (in many cases this code was
> > inherited from before Beam existed). On Flink today, these sinks might
> > occasionally produce duplicate output in the case of failures.
> >
> > Reuven
> >
> > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels  > > wrote:
> >
> > Circling back to the RequiresStableInput annotation[1]. I've done
> some
> > protoyping to see how this could be integrated into Flink. I'm
> > currently
> > writing a test based on RequiresStableInput.
> >
> > I found out there are already checks in place at the Runners to
> > throw in
> > case transforms use RequiresStableInput and its not supported.
> However,
> > not a single transform actually uses the annotation.
> >
> > It seems that the effort stopped at some point? Would it make sense
> to
> > start annotating KafkaExactlyOnceSink with @RequiresStableInput? We
> > could then get rid of the whitelist.
> >
> > -Max
> >
> > [1]
> >
> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
> >
> >
> >
> > On 01.03.19 14:28, Maximilian Michels wrote:
> >  > Just realized that transactions do not spawn multiple elements in
> >  > KafkaExactlyOnceSink. So the proposed solution to stop processing
> >  > elements while a snapshot is pending would work.
> >  >
> >  > It is certainly not optimal in terms of performance for Flink and
> > poses
> >  > problems when checkpoints take long to complete, but it would be
> >  > worthwhile to implement this to make use of the EOS feature.
> >  >
> >  > Thanks,
> >  > Max
> >  >
> >  > On 01.03.19 12:23, Maximilian Michels wrote:
> >  >> Thanks you for the prompt replies. It's great to see that there
> is
> >  >> good understanding of how EOS in Flink works.
> >  >>
> >  >>> This is exactly what RequiresStableInput is supposed to do. On
> the
> >  >>> Flink runner, this would be implemented by delaying processing
> > until
> >  >>> the current checkpoint is done.
> >  >>
> >  >> I don't think that works because we have no control over the
> Kafka
> >  >> transactions. Imagine:
> >  >>
> >  >> 1) ExactlyOnceWriter writes records to Kafka and commits, then
> > starts
> >  >> a new transaction.
> >  >> 2) Flink checkpoints, delaying the processing of elements, the
> >  >> checkpoint fails.
> >  >> 3) We restore from an old checkpoint and will start writing
> > duplicate
> >  >> data to Kafka. The de-duplication that the sink performs does not
> >  >> help, especially because the random shards ids might be assigned
> >  >> differently.
> >  >>
> >  >> IMHO we have to have control over commit to be able to provide
> EOS.
> >  >>
> >  >>> When we discussed this in Aug 2017, the understanding was that 2
> >  >>> Phase commit utility in Flink used to implement Flink's Kafka
> EOS
> >  >>> could not be implemented in Beam's context.
> >  >>
> >  >> That's also my understanding, unless we chang

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-01 Thread Kenneth Knowles
I think I am fundamentally misunderstanding checkpointing in Flink.

If you randomly generate shard ids, buffer those until finalization,
finalize a checkpoint so that you never need to re-run that generation,
isn't the result stable from that point onwards?

Kenn

On Fri, Mar 1, 2019 at 10:30 AM Maximilian Michels  wrote:

> Fully agree. I think we can improve the situation drastically. For
> KafkaIO EOS with Flink we need to make these two changes:
>
> 1) Introduce buffering while the checkpoint is being taken
> 2) Replace the random shard id assignment with something deterministic
>
> However, we won't be able to provide full compatibility with
> RequiresStableInput because Flink only guarantees stable input after a
> checkpoint. RequiresStableInput requires input at any point in time to
> be stable. IMHO the only way to achieve that is materializing output
> which Flink does not currently support.
>
> KafkaIO does not need all the power of RequiresStableInput to achieve
> EOS with Flink, but for the general case I don't see a good solution at
> the moment.
>
> -Max
>
> On 01.03.19 16:45, Reuven Lax wrote:
> > Yeah, the person who was working on it originally stopped working on
> > Beam, and nobody else ever finished it. I think it is important to
> > finish though. Many of the existing Sinks are only fully correct for
> > Dataflow today, because they generate either Reshuffle or GroupByKey to
> > ensure input stability before outputting (in many cases this code was
> > inherited from before Beam existed). On Flink today, these sinks might
> > occasionally produce duplicate output in the case of failures.
> >
> > Reuven
> >
> > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels  > > wrote:
> >
> > Circling back to the RequiresStableInput annotation[1]. I've done
> some
> > protoyping to see how this could be integrated into Flink. I'm
> > currently
> > writing a test based on RequiresStableInput.
> >
> > I found out there are already checks in place at the Runners to
> > throw in
> > case transforms use RequiresStableInput and its not supported.
> However,
> > not a single transform actually uses the annotation.
> >
> > It seems that the effort stopped at some point? Would it make sense
> to
> > start annotating KafkaExactlyOnceSink with @RequiresStableInput? We
> > could then get rid of the whitelist.
> >
> > -Max
> >
> > [1]
> >
> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
> >
> >
> >
> > On 01.03.19 14:28, Maximilian Michels wrote:
> >  > Just realized that transactions do not spawn multiple elements in
> >  > KafkaExactlyOnceSink. So the proposed solution to stop processing
> >  > elements while a snapshot is pending would work.
> >  >
> >  > It is certainly not optimal in terms of performance for Flink and
> > poses
> >  > problems when checkpoints take long to complete, but it would be
> >  > worthwhile to implement this to make use of the EOS feature.
> >  >
> >  > Thanks,
> >  > Max
> >  >
> >  > On 01.03.19 12:23, Maximilian Michels wrote:
> >  >> Thanks you for the prompt replies. It's great to see that there
> is
> >  >> good understanding of how EOS in Flink works.
> >  >>
> >  >>> This is exactly what RequiresStableInput is supposed to do. On
> the
> >  >>> Flink runner, this would be implemented by delaying processing
> > until
> >  >>> the current checkpoint is done.
> >  >>
> >  >> I don't think that works because we have no control over the
> Kafka
> >  >> transactions. Imagine:
> >  >>
> >  >> 1) ExactlyOnceWriter writes records to Kafka and commits, then
> > starts
> >  >> a new transaction.
> >  >> 2) Flink checkpoints, delaying the processing of elements, the
> >  >> checkpoint fails.
> >  >> 3) We restore from an old checkpoint and will start writing
> > duplicate
> >  >> data to Kafka. The de-duplication that the sink performs does not
> >  >> help, especially because the random shards ids might be assigned
> >  >> differently.
> >  >>
> >  >> IMHO we have to have control over commit to be able to provide
> EOS.
> >  >>
> >  >>> When we discussed this in Aug 2017, the understanding was that 2
> >  >>> Phase commit utility in Flink used to implement Flink's Kafka
> EOS
> >  >>> could not be implemented in Beam's context.
> >  >>
> >  >> That's also my understanding, unless we change the interface.
> >  >>
> >  >>> I don't see how SDF solves this problem..
> >  >>
> >  >> SDF has a checkpoint method which the Runner can call, but I
> think
> >  >> that you are right, that the above problem would be the same.
> >  >>
> >  >>> Absolutely. I would love to support EOS in KakaIO for Flink. I
> > think
> >  >>> that will help many future exactly-once sinks.. and address
> > 

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-01 Thread Raghu Angadi
Great. For now, we could write new KafkaIO EO sink without any connection
to current design that would work correctly with Flink Runner. After that I
think we can figure out how to reconcile the two.

On Fri, Mar 1, 2019 at 10:30 AM Maximilian Michels  wrote:

> Fully agree. I think we can improve the situation drastically. For
> KafkaIO EOS with Flink we need to make these two changes:
>
> 1) Introduce buffering while the checkpoint is being taken
> 2) Replace the random shard id assignment with something deterministic
>
> However, we won't be able to provide full compatibility with
> RequiresStableInput because Flink only guarantees stable input after a
> checkpoint. RequiresStableInput requires input at any point in time to
> be stable. IMHO the only way to achieve that is materializing output
> which Flink does not currently support.
>
> KafkaIO does not need all the power of RequiresStableInput to achieve
> EOS with Flink, but for the general case I don't see a good solution at
> the moment.
>
> -Max
>
> On 01.03.19 16:45, Reuven Lax wrote:
> > Yeah, the person who was working on it originally stopped working on
> > Beam, and nobody else ever finished it. I think it is important to
> > finish though. Many of the existing Sinks are only fully correct for
> > Dataflow today, because they generate either Reshuffle or GroupByKey to
> > ensure input stability before outputting (in many cases this code was
> > inherited from before Beam existed). On Flink today, these sinks might
> > occasionally produce duplicate output in the case of failures.
> >
> > Reuven
> >
> > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels  > > wrote:
> >
> > Circling back to the RequiresStableInput annotation[1]. I've done
> some
> > protoyping to see how this could be integrated into Flink. I'm
> > currently
> > writing a test based on RequiresStableInput.
> >
> > I found out there are already checks in place at the Runners to
> > throw in
> > case transforms use RequiresStableInput and its not supported.
> However,
> > not a single transform actually uses the annotation.
> >
> > It seems that the effort stopped at some point? Would it make sense
> to
> > start annotating KafkaExactlyOnceSink with @RequiresStableInput? We
> > could then get rid of the whitelist.
> >
> > -Max
> >
> > [1]
> >
> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
> >
> >
> >
> > On 01.03.19 14:28, Maximilian Michels wrote:
> >  > Just realized that transactions do not spawn multiple elements in
> >  > KafkaExactlyOnceSink. So the proposed solution to stop processing
> >  > elements while a snapshot is pending would work.
> >  >
> >  > It is certainly not optimal in terms of performance for Flink and
> > poses
> >  > problems when checkpoints take long to complete, but it would be
> >  > worthwhile to implement this to make use of the EOS feature.
> >  >
> >  > Thanks,
> >  > Max
> >  >
> >  > On 01.03.19 12:23, Maximilian Michels wrote:
> >  >> Thanks you for the prompt replies. It's great to see that there
> is
> >  >> good understanding of how EOS in Flink works.
> >  >>
> >  >>> This is exactly what RequiresStableInput is supposed to do. On
> the
> >  >>> Flink runner, this would be implemented by delaying processing
> > until
> >  >>> the current checkpoint is done.
> >  >>
> >  >> I don't think that works because we have no control over the
> Kafka
> >  >> transactions. Imagine:
> >  >>
> >  >> 1) ExactlyOnceWriter writes records to Kafka and commits, then
> > starts
> >  >> a new transaction.
> >  >> 2) Flink checkpoints, delaying the processing of elements, the
> >  >> checkpoint fails.
> >  >> 3) We restore from an old checkpoint and will start writing
> > duplicate
> >  >> data to Kafka. The de-duplication that the sink performs does not
> >  >> help, especially because the random shards ids might be assigned
> >  >> differently.
> >  >>
> >  >> IMHO we have to have control over commit to be able to provide
> EOS.
> >  >>
> >  >>> When we discussed this in Aug 2017, the understanding was that 2
> >  >>> Phase commit utility in Flink used to implement Flink's Kafka
> EOS
> >  >>> could not be implemented in Beam's context.
> >  >>
> >  >> That's also my understanding, unless we change the interface.
> >  >>
> >  >>> I don't see how SDF solves this problem..
> >  >>
> >  >> SDF has a checkpoint method which the Runner can call, but I
> think
> >  >> that you are right, that the above problem would be the same.
> >  >>
> >  >>> Absolutely. I would love to support EOS in KakaIO for Flink. I
> > think
> >  >>> that will help many future exactly-once sinks.. and address
> >  >>> fundamental incompatibility between Beam model and Flink

[BEAM-6759] CassandraIOTest failing in presubmit in multiple PRs

2019-03-01 Thread Alex Amato
https://issues.apache.org/jira/browse/BEAM-6759


Hi, I have seen this test failing in presubmit in multiple PRs, which does
seem to be related to the changes. Any ideas why this is failing at the
moment?

CassandraIOTest - scans

https://builds.apache.org/job/beam_PreCommit_Java_Commit/4586/testReport/junit/org.apache.beam.sdk.io.cassandra/CassandraIOTest/classMethod/

https://scans.gradle.com/s/btppkeky63a5g/console-log?task=:beam-sdks-java-io-cassandra:test#L7



java.lang.NullPointerException at
org.cassandraunit.utils.EmbeddedCassandraServerHelper.dropKeyspacesWithNativeDriver(EmbeddedCassandraServerHelper.java:285)
at
org.cassandraunit.utils.EmbeddedCassandraServerHelper.dropKeyspaces(EmbeddedCassandraServerHelper.java:281)
at
org.cassandraunit.utils.EmbeddedCassandraServerHelper.cleanEmbeddedCassandra(EmbeddedCassandraServerHelper.java:193)
at
org.apache.beam.sdk.io.cassandra.CassandraIOTest.stopCassandra(CassandraIOTest.java:129)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at
org.junit.internal.runners.statements.RunAfters.invokeMethod(RunAfters.java:46)
at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
at org.junit.runners.ParentRunner.run(ParentRunner.java:396) at
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at
org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
at
org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)
at
org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
at
org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
at
org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at
org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
at java.lang.Thread.run(Thread.java:748)


Website tests strangely broken

2019-03-01 Thread Pablo Estrada
Hello all,
the website tests are broken. I've filed BEAM-6760
 to track fixing them, but
I wanted to see if anyone has any idea about why it may be failing.

It's been broken for a few days:
https://builds.apache.org/job/beam_PreCommit_Website_Cron/

And looking at the failures, it seems that they represent broken links:
https://builds.apache.org/job/beam_PreCommit_Website_Cron/725/console

But looking at each of the links opens their website without problems.

It may be some environmental temporary issue, but why would it fail
consistently for the last few days then?
Thoughts?
Thanks
-P.


[BEAM-6761] Pydoc is giving cryptic error messages, blocking my PR :(

2019-03-01 Thread Alex Amato
BEAM-6761 

This is blocking my PR at the moment, the output doesn't seem to match the
file and I am not sure how to proceed

pydoc Output
https://scans.gradle.com/s/im6t66hhy4bdq/console-log?task=:beam-sdks-python:docs#L3


Files
https://github.com/apache/beam/pull/7936/files




/usr/local/google/home/ajamato/go/src/
github.com/apache/beam/sdks/python/apache_beam/testing/metric_result_matchers.py:docstring
of apache_beam.testing.metric_result_matchers:13: WARNING: Unexpected
indentation.
/usr/local/google/home/ajamato/go/src/
github.com/apache/beam/sdks/python/apache_beam/testing/metric_result_matchers.py:docstring
of apache_beam.testing.metric_result_matchers:15: WARNING: Block quote ends
without a blank line; unexpected unindent.
/usr/local/google/home/ajamato/go/src/
github.com/apache/beam/sdks/python/apache_beam/testing/metric_result_matchers.py:docstring
of apache_beam.testing.metric_result_matchers:18: WARNING: Definition list
ends without a blank line; unexpected unindent.
/usr/local/google/home/ajamato/go/src/
github.com/apache/beam/sdks/python/apache_beam/testing/metric_result_matchers.py:docstring
of apache_beam.testing.metric_result_matchers:19: WARNING: Definition list
ends without a blank line; unexpected unindent.
/usr/local/google/home/ajamato/go/src/
github.com/apache/beam/sdks/python/apache_beam/testing/metric_result_matchers.py:docstring
of apache_beam.testing.metric_result_matchers:21: WARNING: Unexpected
indentation.
/usr/local/google/home/ajamato/go/src/
github.com/apache/beam/sdks/python/apache_beam/testing/metric_result_matchers.py:docstring
of apache_beam.testing.metric_result_matchers:22: WARNING: Block quote ends
without a blank line; unexpected unindent.



= copy of the file in its current state (I will probably modify the PR


https://pastebin.com/8bWrPZVJ


Re: [BEAM-6761] Pydoc is giving cryptic error messages, blocking my PR :(

2019-03-01 Thread Udi Meiri
I think it's referring to the big comment at the top of the
sdks/python/apache_beam/testing/metric_result_matchers.py.
The line numbers are relative to the beginning of the block.

On Fri, Mar 1, 2019 at 2:21 PM Alex Amato  wrote:

> BEAM-6761 
>
> This is blocking my PR at the moment, the output doesn't seem to match the
> file and I am not sure how to proceed
>
> pydoc Output
>
> https://scans.gradle.com/s/im6t66hhy4bdq/console-log?task=:beam-sdks-python:docs#L3
> 
>
> Files
> https://github.com/apache/beam/pull/7936/files
> 
>
>
>
> /usr/local/google/home/ajamato/go/src/
> github.com/apache/beam/sdks/python/apache_beam/testing/metric_result_matchers.py:docstring
> of apache_beam.testing.metric_result_matchers:13: WARNING: Unexpected
> indentation.
> /usr/local/google/home/ajamato/go/src/
> github.com/apache/beam/sdks/python/apache_beam/testing/metric_result_matchers.py:docstring
> of apache_beam.testing.metric_result_matchers:15: WARNING: Block quote ends
> without a blank line; unexpected unindent.
> /usr/local/google/home/ajamato/go/src/
> github.com/apache/beam/sdks/python/apache_beam/testing/metric_result_matchers.py:docstring
> of apache_beam.testing.metric_result_matchers:18: WARNING: Definition list
> ends without a blank line; unexpected unindent.
> /usr/local/google/home/ajamato/go/src/
> github.com/apache/beam/sdks/python/apache_beam/testing/metric_result_matchers.py:docstring
> of apache_beam.testing.metric_result_matchers:19: WARNING: Definition list
> ends without a blank line; unexpected unindent.
> /usr/local/google/home/ajamato/go/src/
> github.com/apache/beam/sdks/python/apache_beam/testing/metric_result_matchers.py:docstring
> of apache_beam.testing.metric_result_matchers:21: WARNING: Unexpected
> indentation.
> /usr/local/google/home/ajamato/go/src/
> github.com/apache/beam/sdks/python/apache_beam/testing/metric_result_matchers.py:docstring
> of apache_beam.testing.metric_result_matchers:22: WARNING: Block quote ends
> without a blank line; unexpected unindent.
>
>
>
> = copy of the file in its current state (I will probably modify the PR
> 
>
> https://pastebin.com/8bWrPZVJ
>
>
>
>


smime.p7s
Description: S/MIME Cryptographic Signature


Re: [BEAM-6761] Pydoc is giving cryptic error messages, blocking my PR :(

2019-03-01 Thread Udi Meiri
Try making it a literal block:

"""MetricResult matchers for validating metrics in PipelineResults.

example usage:
::

result = my_pipeline.run()
all_metrics = result.metrics().all_metrics()

matchers = [
  MetricResultMatcher(
  namespace='myNamespace',
  name='myName',
  step='myStep',
  labels={
  'pcollection': 'myCollection',
  'myCustomKey': 'myCustomValue'
  },
  attempted=42,
  committed=42
  )
]
errors = metric_result_matchers.verify_all(all_metrics, matchers)
self.assertFalse(errors, errors)

"""
https://www.sphinx-doc.org/en/master/usage/restructuredtext/basics.html

On Fri, Mar 1, 2019 at 3:31 PM Udi Meiri  wrote:

> I think it's referring to the big comment at the top of the
> sdks/python/apache_beam/testing/metric_result_matchers.py.
> The line numbers are relative to the beginning of the block.
>
> On Fri, Mar 1, 2019 at 2:21 PM Alex Amato  wrote:
>
>> BEAM-6761 
>>
>> This is blocking my PR at the moment, the output doesn't seem to match
>> the file and I am not sure how to proceed
>>
>> pydoc Output
>>
>> https://scans.gradle.com/s/im6t66hhy4bdq/console-log?task=:beam-sdks-python:docs#L3
>> 
>>
>> Files
>> https://github.com/apache/beam/pull/7936/files
>> 
>>
>>
>>
>> /usr/local/google/home/ajamato/go/src/
>> github.com/apache/beam/sdks/python/apache_beam/testing/metric_result_matchers.py:docstring
>> of apache_beam.testing.metric_result_matchers:13: WARNING: Unexpected
>> indentation.
>> /usr/local/google/home/ajamato/go/src/
>> github.com/apache/beam/sdks/python/apache_beam/testing/metric_result_matchers.py:docstring
>> of apache_beam.testing.metric_result_matchers:15: WARNING: Block quote ends
>> without a blank line; unexpected unindent.
>> /usr/local/google/home/ajamato/go/src/
>> github.com/apache/beam/sdks/python/apache_beam/testing/metric_result_matchers.py:docstring
>> of apache_beam.testing.metric_result_matchers:18: WARNING: Definition list
>> ends without a blank line; unexpected unindent.
>> /usr/local/google/home/ajamato/go/src/
>> github.com/apache/beam/sdks/python/apache_beam/testing/metric_result_matchers.py:docstring
>> of apache_beam.testing.metric_result_matchers:19: WARNING: Definition list
>> ends without a blank line; unexpected unindent.
>> /usr/local/google/home/ajamato/go/src/
>> github.com/apache/beam/sdks/python/apache_beam/testing/metric_result_matchers.py:docstring
>> of apache_beam.testing.metric_result_matchers:21: WARNING: Unexpected
>> indentation.
>> /usr/local/google/home/ajamato/go/src/
>> github.com/apache/beam/sdks/python/apache_beam/testing/metric_result_matchers.py:docstring
>> of apache_beam.testing.metric_result_matchers:22: WARNING: Block quote ends
>> without a blank line; unexpected unindent.
>>
>>
>>
>> = copy of the file in its current state (I will probably modify the
>> PR 
>>
>> https://pastebin.com/8bWrPZVJ
>>
>>
>>
>>


smime.p7s
Description: S/MIME Cryptographic Signature


Re: Website tests strangely broken

2019-03-01 Thread Ruoyun Huang
The log says running on 880 external links, but only showing 100ish (and
the number varies across different run) failure messages.  Likely it is
just flakey due to connection not stable on JIRA's site?

Not sure how our tests are organized, but maybe re-try http request helps?

On Fri, Mar 1, 2019 at 12:46 PM Pablo Estrada  wrote:

> Hello all,
> the website tests are broken. I've filed BEAM-6760
>  to track fixing them,
> but I wanted to see if anyone has any idea about why it may be failing.
>
> It's been broken for a few days:
> https://builds.apache.org/job/beam_PreCommit_Website_Cron/
>
> And looking at the failures, it seems that they represent broken links:
> https://builds.apache.org/job/beam_PreCommit_Website_Cron/725/console
>
> But looking at each of the links opens their website without problems.
>
> It may be some environmental temporary issue, but why would it fail
> consistently for the last few days then?
> Thoughts?
> Thanks
> -P.
>


-- 

Ruoyun  Huang


Re: [BEAM-6761] Pydoc is giving cryptic error messages, blocking my PR :(

2019-03-01 Thread Alex Amato
Yup, copying and pasting your comment worked. Thanks for the tip Udi!
Helped me out a lot.

I guess this tool ignores all the comments when counting lines? Which makes
it hard for us to use because we have a commented headder

On Fri, Mar 1, 2019 at 3:46 PM Udi Meiri  wrote:

> Try making it a literal block:
>
> """MetricResult matchers for validating metrics in PipelineResults.
>
> example usage:
> ::
>
> result = my_pipeline.run()
> all_metrics = result.metrics().all_metrics()
>
> matchers = [
>   MetricResultMatcher(
>   namespace='myNamespace',
>   name='myName',
>   step='myStep',
>   labels={
>   'pcollection': 'myCollection',
>   'myCustomKey': 'myCustomValue'
>   },
>   attempted=42,
>   committed=42
>   )
> ]
> errors = metric_result_matchers.verify_all(all_metrics, matchers)
> self.assertFalse(errors, errors)
>
> """
> https://www.sphinx-doc.org/en/master/usage/restructuredtext/basics.html
>
> On Fri, Mar 1, 2019 at 3:31 PM Udi Meiri  wrote:
>
>> I think it's referring to the big comment at the top of the
>> sdks/python/apache_beam/testing/metric_result_matchers.py.
>> The line numbers are relative to the beginning of the block.
>>
>> On Fri, Mar 1, 2019 at 2:21 PM Alex Amato  wrote:
>>
>>> BEAM-6761 
>>>
>>> This is blocking my PR at the moment, the output doesn't seem to match
>>> the file and I am not sure how to proceed
>>>
>>> pydoc Output
>>>
>>> https://scans.gradle.com/s/im6t66hhy4bdq/console-log?task=:beam-sdks-python:docs#L3
>>> 
>>>
>>> Files
>>> https://github.com/apache/beam/pull/7936/files
>>> 
>>>
>>>
>>>
>>> /usr/local/google/home/ajamato/go/src/
>>> github.com/apache/beam/sdks/python/apache_beam/testing/metric_result_matchers.py:docstring
>>> of apache_beam.testing.metric_result_matchers:13: WARNING: Unexpected
>>> indentation.
>>> /usr/local/google/home/ajamato/go/src/
>>> github.com/apache/beam/sdks/python/apache_beam/testing/metric_result_matchers.py:docstring
>>> of apache_beam.testing.metric_result_matchers:15: WARNING: Block quote ends
>>> without a blank line; unexpected unindent.
>>> /usr/local/google/home/ajamato/go/src/
>>> github.com/apache/beam/sdks/python/apache_beam/testing/metric_result_matchers.py:docstring
>>> of apache_beam.testing.metric_result_matchers:18: WARNING: Definition list
>>> ends without a blank line; unexpected unindent.
>>> /usr/local/google/home/ajamato/go/src/
>>> github.com/apache/beam/sdks/python/apache_beam/testing/metric_result_matchers.py:docstring
>>> of apache_beam.testing.metric_result_matchers:19: WARNING: Definition list
>>> ends without a blank line; unexpected unindent.
>>> /usr/local/google/home/ajamato/go/src/
>>> github.com/apache/beam/sdks/python/apache_beam/testing/metric_result_matchers.py:docstring
>>> of apache_beam.testing.metric_result_matchers:21: WARNING: Unexpected
>>> indentation.
>>> /usr/local/google/home/ajamato/go/src/
>>> github.com/apache/beam/sdks/python/apache_beam/testing/metric_result_matchers.py:docstring
>>> of apache_beam.testing.metric_result_matchers:22: WARNING: Block quote ends
>>> without a blank line; unexpected unindent.
>>>
>>>
>>>
>>> = copy of the file in its current state (I will probably modify the
>>> PR 
>>>
>>> https://pastebin.com/8bWrPZVJ
>>>
>>>
>>>
>>>


Re: Beam Summits!

2019-03-01 Thread Thomas Weise
Update: organizers are looking for new dates for the Summit in SF,
currently trending towards October.

For Beam Summit Europe see:
https://twitter.com/matthiasbaetens/status/1098854758893273088

On Wed, Jan 23, 2019 at 9:09 PM Austin Bennett 
wrote:

> Hi All,
>
> PMC approval still pending for Summit in SF (so things may change), but
> wanted to get a preliminary CfP out there to start to get sense of interest
> -- giving the targeted dates are approaching.  Much of this
> delay/uncertainty my fault and I should have done more before the holidays
> and my long vacation in from end of December through mid-January.  This CfP
> will remain open for some time, and upon/after approval will make sure to
> give notice for a CfP deadline.
>
> Please submit talks via:
>
> https://docs.google.com/forms/d/e/1FAIpQLSfD0qhoS2QrDbtK1E85gATGQCgRGKhQcLIkiiAsPW9G_7Um_Q/viewform?usp=sf_link
>
> Would very much encourage anyone that can lead
> hands-on/tutorials/workshops for full day, half-day, focused couple hours,
> etc to apply, as well as any technical talks and/or use cases.  Again,
> tentative dates(s) 3 and 4 April 2019.
>
> Thanks,
> Austin
>
>
> On Mon, Jan 21, 2019 at 7:58 PM Austin Bennett <
> whatwouldausti...@gmail.com> wrote:
>
>> Hi All,
>>
>> Other projects/Summits like Kafka and Spark offer add-on days to summits
>> for training.  I'm wondering the appetite/interest for hands-on sessions
>> for working with Beam, and whether we think that'd be helpful.  Are there
>> people that would benefit from a beginning with Beam day, or a more
>> advanced/specialized session.  This was on the original agenda for London,
>> but hadn't materialized, seeing if we think there is interest to make this
>> worth putting together/making-available.
>>
>> Furthermore, it had been mentioned that an introduction to contributing
>> to Beam might also be beneficial.  Also curious to hear whether that would
>> be of interest to people here (or for those that those here know, but
>> aren't following these distribution channels for themselves -- since
>> following dev@ or even user@ is potentially a more focused selection of
>> those with an interest in Beam.
>>
>> Thanks,
>> Austin
>>
>>
>>
>> On Wed, Dec 19, 2018 at 3:05 PM Austin Bennett <
>> whatwouldausti...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I really enjoyed Beam Summit in London (Thanks Matthias!), and there was
>>> much enthusiasm for continuations.  We had selected that location in a
>>> large part due to the growing community there, and we have users in a
>>> variety of locations.  In our 2019 calendar,
>>> https://docs.google.com/spreadsheets/d/1CloF63FOKSPM6YIuu8eExjhX6xrIiOp5j4zPbSg3Apo/
>>> shared in the past weeks, 3 Summits are tentatively slotted for this year.
>>> Wanting to start running this by the group to get input.
>>>
>>> * Beam Summit NA, in San Francisco, approx 3 April 2019 (following Flink
>>> Forward).  I can organize.
>>> * Beam Summit Europe, in Stockholm, this was the runner up in voting
>>> falling behind London.  Or perhaps Berlin?  October-ish 2019
>>> * Beam Summit Asia, in Tokyo ??
>>>
>>> What are general thoughts on locations/dates?
>>>
>>> Looking forward to convening in person soon.
>>>
>>> Cheers,
>>> Austin
>>>
>>


Re: KafkaIO Exactly-Once & Flink Runner

2019-03-01 Thread Raghu Angadi
On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi  wrote:

> On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles  wrote:
>
>> I'm not sure what a hard fail is. I probably have a shallow
>> understanding, but doesn't @RequiresStableInput work for 2PC? The
>> preCommit() phase should establish the transaction and commit() is not
>> called until after checkpoint finalization. Can you describe the way that
>> it does not work a little bit more?
>>
>
> - preCommit() is called before checkpoint. Kafka EOS in Flink starts the
> transaction before this and makes sure it flushes all records in
> preCommit(). So far good.
> - commit is called after checkpoint is persisted. Now, imagine commit()
> fails for some reason. There is no option to rerun the 1st phase to write
> the records again in a new transaction. This is a hard failure for the the
> job. In practice Flink might attempt to commit again (not sure how many
> times), which is likely to fail and eventually results in job failure.
>

Btw, just to clarify the above failure case is not related to Beam or
@RequiresStableInput.
It is how 2PC in Flink used by it EOS Kafka sync is designed. preCommit()
can't be rerun past the checkpoint, so commit() phase needs to be self
sufficient to handle failures and reruns.



>
>
>> Kenn
>>
>> On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi  wrote:
>>
>>> On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles 
>>> wrote:
>>>
 I believe the way you would implement the logic behind Flink's
 KafkaProducer would be to have two steps:

 1. Start transaction
 2. @RequiresStableInput Close transaction

>>>
>>> I see.  What happens if closing the transaction fails in (2)? Flink's
>>> 2PC requires that commit() should never hard fail once preCommit()
>>> succeeds. I think that is cost of not having an extra shuffle. It is
>>> alright since this policy has worked well for Flink so far.
>>>
>>> Overall, it will be great to have @RequiresStableInput support in Flink
>>> runner.
>>>
>>> Raghu.
>>>
 The FlinkRunner would need to insert the "wait until checkpoint
 finalization" logic wherever it sees @RequiresStableInput, which is already
 what it would have to do.

 This matches the KafkaProducer's logic - delay closing the transaction
 until checkpoint finalization. This answers my main question, which is "is
 @RequiresStableInput expressive enough to allow Beam-on-Flink to have
 exactly once behavior with the same performance characteristics as native
 Flink checkpoint finalization?"

 Kenn

 [1] https://github.com/apache/beam/pull/7955

 On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax  wrote:

>
>
> On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi 
> wrote:
>
>>
>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>>> KafkaProducer supports exactly-once. It simply commits the pending
>>> transaction once it has completed a checkpoint.
>>
>>
>>
>> On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels 
>> wrote:
>>
>>> Hi,
>>>
>>> I came across KafkaIO's Runner whitelist [1] for enabling
>>> exactly-once
>>> semantics (EOS). I think it is questionable to exclude Runners from
>>> inside a transform, but I see that the intention was to save users
>>> from
>>> surprises.
>>>
>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's
>>> native
>>> KafkaProducer supports exactly-once. It simply commits the pending
>>> transaction once it has completed a checkpoint.
>>>
>>
>>
>> When we discussed this in Aug 2017, the understanding was that 2
>> Phase commit utility in Flink used to implement Flink's Kafka EOS could 
>> not
>> be implemented in Beam's context.
>> See  this message
>>  in
>> that dev thread. Has anything changed in this regard? The whole thread is
>> relevant to this topic and worth going through.
>>
>
> I think that TwoPhaseCommit utility class wouldn't work. The Flink
> runner would probably want to directly use notifySnapshotComplete in order
> to implement @RequiresStableInput.
>
>>
>>
>>>
>>> A checkpoint is realized by sending barriers through all channels
>>> starting from the source until reaching all sinks. Every operator
>>> persists its state once it has received a barrier on all its input
>>> channels, it then forwards it to the downstream operators.
>>>
>>> The architecture of Beam's KafkaExactlyOnceSink is as follows[2]:
>>>
>>> Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds ->
>>> GroupByKey -> ExactlyOnceWriter
>>>
>>> As I understood, Spark or Dataflow use the GroupByKey stages to
>>> persist
>>> the input. That is not required in Flink to be able to take a
>>> consistent
>>> snapshot of the pip