Re: kafka 0.9 support

2019-04-03 Thread Raghu Angadi
I mean, +1 for removing support for old Kafka versions after next LTS

What the cut off should be for 'old' version is can be discussed then. My
choice would be 0.11.
Raghu.

On Wed, Apr 3, 2019 at 4:36 PM Raghu Angadi  wrote:

> +1 for next LTS.
>
> On Wed, Apr 3, 2019 at 2:30 PM Ismaël Mejía  wrote:
>
>> We should focus on the main reason to remove the Kafka 0.9 support. I
>> have the impression that this is mostly to ease the maintenance, but
>> from the current status (and the removal PR [1]), it does not seem
>> like it is a burden to continue supporting 0.9. In any case I am +1 to
>> remove the support for 0.9, but maybe it is a good idea to just wait
>> until the next LTS is decided and do it just after. This way we will
>> still cover existing users for some time.
>>
>> Creating different modules for different versions of KafkaIO does not
>> make sense because it is even more complicated than just staying the
>> way we are today for not much in return. We better improve the status
>> quo by parametrizing our current tests to validate that KafkaIO works
>> correctly with the different supported versions (so far we only test
>> against version 1.0.0). I filled BEAM-7003 to track this.
>>
>> [1] https://github.com/apache/beam/pull/8186
>> [2] https://issues.apache.org/jira/browse/BEAM-7003
>>
>> ps. Actually this discussion brings to the table the issue of
>> removing/deprecated/changing supported versions on parts of the API
>> marked as @Experimental. I will fork a new thread to discuss this.
>>
>> On Wed, Apr 3, 2019 at 6:53 PM Raghu Angadi  wrote:
>> >
>> >
>> >
>> > On Wed, Apr 3, 2019 at 5:46 AM David Morávek 
>> wrote:
>> >>
>> >> I'd say that APIs we use in KafkaIO are pretty much stable since 0.10
>> release, all reflection based compatibility adapters seem to be aimed for
>> 0.9 release (which is 8 major releases behind current Kafka release).
>> >>
>> >> We may take an inspiration from Flink's kafka connector, they maintain
>> separate maven artifact for all supported Kafka APIs. This may be the best
>> approach as we can still share most of the codebase between versions, have
>> compile time checks and also run tests against all of the supported
>> versions.
>> >
>> >
>> > From that page, Flink also moved to single Kafka connector for versions
>> 10.x and newer. Kafka itself seems to have improved compatibility between
>> client and broker versions starting 0.11. Not sure if there is any need now
>> to make multiple versions of KafkaIO versions for 0.9.x etc. Are you
>> suggesting we should?
>> >
>> > From Flink's page:
>> > "Starting with Flink 1.7, there is a new universal Kafka connector that
>> does not track a specific Kafka major version. Rather, it tracks the latest
>> version of Kafka at the time of the Flink release.
>> >
>> > If your Kafka broker version is 1.0.0 or newer, you should use this
>> Kafka connector. If you use an older version of Kafka (0.11, 0.10, 0.9, or
>> 0.8), you should use the connector corresponding to the broker version."
>> >
>> >
>> >>
>> >>
>> >> I'm not really comfortable with reflection based adapters as they seem
>> fragile and don't provide compile time checks.
>> >>
>> >> On Tue, Apr 2, 2019 at 11:27 PM Austin Bennett <
>> whatwouldausti...@gmail.com> wrote:
>> >>>
>> >>> I withdraw my concern -- checked on info on the cluster I will
>> eventually access.  It is on 0.8, so I was speaking too soon.  Can't speak
>> to rest of user base.
>> >>>
>> >>> On Tue, Apr 2, 2019 at 11:03 AM Raghu Angadi 
>> wrote:
>> >>>>
>> >>>> Thanks to David Morávek for pointing out possible improvement to
>> KafkaIO for dropping support for 0.9 since it avoids having a second
>> consumer just to fetch latest offsets for backlog.
>> >>>>
>> >>>> Ideally we should be dropping 0.9 support for next major release, in
>> fact better to drop versions before 0.10.1 at the same time. This would
>> further reduce reflection based calls for supporting multiple versions. If
>> the users still on 0.9 could stay on current stable release of Beam,
>> dropping would not affect them. Otherwise, it would be good to hear from
>> them about how long we need to keep support for old versions.
>> >>>>
>> >>>> I don't think it is good 

Re: kafka 0.9 support

2019-04-03 Thread Raghu Angadi
+1 for next LTS.

On Wed, Apr 3, 2019 at 2:30 PM Ismaël Mejía  wrote:

> We should focus on the main reason to remove the Kafka 0.9 support. I
> have the impression that this is mostly to ease the maintenance, but
> from the current status (and the removal PR [1]), it does not seem
> like it is a burden to continue supporting 0.9. In any case I am +1 to
> remove the support for 0.9, but maybe it is a good idea to just wait
> until the next LTS is decided and do it just after. This way we will
> still cover existing users for some time.
>
> Creating different modules for different versions of KafkaIO does not
> make sense because it is even more complicated than just staying the
> way we are today for not much in return. We better improve the status
> quo by parametrizing our current tests to validate that KafkaIO works
> correctly with the different supported versions (so far we only test
> against version 1.0.0). I filled BEAM-7003 to track this.
>
> [1] https://github.com/apache/beam/pull/8186
> [2] https://issues.apache.org/jira/browse/BEAM-7003
>
> ps. Actually this discussion brings to the table the issue of
> removing/deprecated/changing supported versions on parts of the API
> marked as @Experimental. I will fork a new thread to discuss this.
>
> On Wed, Apr 3, 2019 at 6:53 PM Raghu Angadi  wrote:
> >
> >
> >
> > On Wed, Apr 3, 2019 at 5:46 AM David Morávek 
> wrote:
> >>
> >> I'd say that APIs we use in KafkaIO are pretty much stable since 0.10
> release, all reflection based compatibility adapters seem to be aimed for
> 0.9 release (which is 8 major releases behind current Kafka release).
> >>
> >> We may take an inspiration from Flink's kafka connector, they maintain
> separate maven artifact for all supported Kafka APIs. This may be the best
> approach as we can still share most of the codebase between versions, have
> compile time checks and also run tests against all of the supported
> versions.
> >
> >
> > From that page, Flink also moved to single Kafka connector for versions
> 10.x and newer. Kafka itself seems to have improved compatibility between
> client and broker versions starting 0.11. Not sure if there is any need now
> to make multiple versions of KafkaIO versions for 0.9.x etc. Are you
> suggesting we should?
> >
> > From Flink's page:
> > "Starting with Flink 1.7, there is a new universal Kafka connector that
> does not track a specific Kafka major version. Rather, it tracks the latest
> version of Kafka at the time of the Flink release.
> >
> > If your Kafka broker version is 1.0.0 or newer, you should use this
> Kafka connector. If you use an older version of Kafka (0.11, 0.10, 0.9, or
> 0.8), you should use the connector corresponding to the broker version."
> >
> >
> >>
> >>
> >> I'm not really comfortable with reflection based adapters as they seem
> fragile and don't provide compile time checks.
> >>
> >> On Tue, Apr 2, 2019 at 11:27 PM Austin Bennett <
> whatwouldausti...@gmail.com> wrote:
> >>>
> >>> I withdraw my concern -- checked on info on the cluster I will
> eventually access.  It is on 0.8, so I was speaking too soon.  Can't speak
> to rest of user base.
> >>>
> >>> On Tue, Apr 2, 2019 at 11:03 AM Raghu Angadi  wrote:
> >>>>
> >>>> Thanks to David Morávek for pointing out possible improvement to
> KafkaIO for dropping support for 0.9 since it avoids having a second
> consumer just to fetch latest offsets for backlog.
> >>>>
> >>>> Ideally we should be dropping 0.9 support for next major release, in
> fact better to drop versions before 0.10.1 at the same time. This would
> further reduce reflection based calls for supporting multiple versions. If
> the users still on 0.9 could stay on current stable release of Beam,
> dropping would not affect them. Otherwise, it would be good to hear from
> them about how long we need to keep support for old versions.
> >>>>
> >>>> I don't think it is good idea to have multiple forks of KafkaIO in
> the same repo. If we do go that route, we should fork the entire kafka
> directory and rename the main class KafkaIO_Unmaintained :).
> >>>>
> >>>> IMHO, so far, additional complexity for supporting these versions is
> not that bad. Most of it is isolated to ConsumerSpEL.java &
> ProducerSpEL.java.
> >>>> My first preference is dropping support for deprecated versions (and
> a deprecate a few more versions, may be till the version that added
> transactions around 0.11.x I thi

Re: kafka 0.9 support

2019-04-03 Thread Raghu Angadi
On Wed, Apr 3, 2019 at 5:46 AM David Morávek 
wrote:

> I'd say that APIs we use in KafkaIO are pretty much stable since 0.10
> release, all reflection based compatibility adapters seem to be aimed for
> 0.9 release (which is 8 major releases behind current Kafka release).
>
> We may take an inspiration from Flink's kafka connector
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html>,
> they maintain separate maven artifact for all supported Kafka APIs. This
> may be the best approach as we can still share most of the codebase between
> versions, have compile time checks and also run tests against all of the
> supported versions.
>

>From that page, Flink also moved to single Kafka connector for versions
10.x and newer. Kafka itself seems to have improved compatibility between
client and broker versions starting 0.11. Not sure if there is any need now
to make multiple versions of KafkaIO versions for 0.9.x etc. Are you
suggesting we should?

>From Flink's page:
"Starting with Flink 1.7, there is a new universal Kafka connector that
does not track a specific Kafka major version. Rather, it tracks the latest
version of Kafka at the time of the Flink release.

If your Kafka broker version is 1.0.0 or newer, you should use this Kafka
connector. If you use an older version of Kafka (0.11, 0.10, 0.9, or 0.8),
you should use the connector corresponding to the broker version."



>
> I'm not really comfortable with reflection based adapters
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java>
> as they seem fragile and don't provide compile time checks.
>
> On Tue, Apr 2, 2019 at 11:27 PM Austin Bennett <
> whatwouldausti...@gmail.com> wrote:
>
>> I withdraw my concern -- checked on info on the cluster I will eventually
>> access.  It is on 0.8, so I was speaking too soon.  Can't speak to rest of
>> user base.
>>
>> On Tue, Apr 2, 2019 at 11:03 AM Raghu Angadi  wrote:
>>
>>> Thanks to David Morávek for pointing out possible improvement to KafkaIO
>>> for dropping support for 0.9 since it avoids having a second consumer just
>>> to fetch latest offsets for backlog.
>>>
>>> Ideally we should be dropping 0.9 support for next major release, in
>>> fact better to drop versions before 0.10.1 at the same time. This would
>>> further reduce reflection based calls for supporting multiple versions. If
>>> the users still on 0.9 could stay on current stable release of Beam,
>>> dropping would not affect them. Otherwise, it would be good to hear from
>>> them about how long we need to keep support for old versions.
>>>
>>> I don't think it is good idea to have multiple forks of KafkaIO in the
>>> same repo. If we do go that route, we should fork the entire kafka
>>> directory and rename the main class KafkaIO_Unmaintained :).
>>>
>>> IMHO, so far, additional complexity for supporting these versions is not
>>> that bad. Most of it is isolated to ConsumerSpEL.java & ProducerSpEL.java.
>>> My first preference is dropping support for deprecated versions (and a
>>> deprecate a few more versions, may be till the version that added
>>> transactions around 0.11.x I think).
>>>
>>> I haven't looked into what's new in Kafka 2.x. Are there any features
>>> that KafkaIO should take advantage of? I have not noticed our existing code
>>> breaking. We should certainly certainly support latest releases of Kafka.
>>>
>>> Raghu.
>>>
>>> On Tue, Apr 2, 2019 at 10:27 AM Mingmin Xu  wrote:
>>>
>>>>
>>>> We're still using Kafka 0.10 a lot, similar as 0.9 IMO. To expand
>>>> multiple versions in KafkaIO is quite complex now, and it confuses users
>>>> which is supported / which is not. I would prefer to support Kafka 2.0+
>>>> only in the latest version. For old versions, there're some options:
>>>> 1). document Kafka-Beam support versions, like what we do in
>>>> FlinkRunner;
>>>> 2). maintain separated KafkaIOs for old versions;
>>>>
>>>> 1) would be easy to maintain, and I assume there should be no issue to
>>>> use Beam-Core 3.0 together with KafkaIO 2.0.
>>>>
>>>> Any thoughts?
>>>>
>>>> Mingmin
>>>>
>>>> On Tue, Apr 2, 2019 at 9:56 AM Reuven Lax  wrote:
>>>>
>>>>> KafkaIO is marked as Experimental, and the comment already warns that
>>>>> 0.9 suppor

Re: kafka 0.9 support

2019-04-02 Thread Raghu Angadi
Thanks to David Morávek for pointing out possible improvement to KafkaIO
for dropping support for 0.9 since it avoids having a second consumer just
to fetch latest offsets for backlog.

Ideally we should be dropping 0.9 support for next major release, in fact
better to drop versions before 0.10.1 at the same time. This would further
reduce reflection based calls for supporting multiple versions. If the
users still on 0.9 could stay on current stable release of Beam, dropping
would not affect them. Otherwise, it would be good to hear from them about
how long we need to keep support for old versions.

I don't think it is good idea to have multiple forks of KafkaIO in the same
repo. If we do go that route, we should fork the entire kafka directory and
rename the main class KafkaIO_Unmaintained :).

IMHO, so far, additional complexity for supporting these versions is not
that bad. Most of it is isolated to ConsumerSpEL.java & ProducerSpEL.java.
My first preference is dropping support for deprecated versions (and a
deprecate a few more versions, may be till the version that added
transactions around 0.11.x I think).

I haven't looked into what's new in Kafka 2.x. Are there any features that
KafkaIO should take advantage of? I have not noticed our existing code
breaking. We should certainly certainly support latest releases of Kafka.

Raghu.

On Tue, Apr 2, 2019 at 10:27 AM Mingmin Xu  wrote:

>
> We're still using Kafka 0.10 a lot, similar as 0.9 IMO. To expand multiple
> versions in KafkaIO is quite complex now, and it confuses users which is
> supported / which is not. I would prefer to support Kafka 2.0+ only in the
> latest version. For old versions, there're some options:
> 1). document Kafka-Beam support versions, like what we do in FlinkRunner;
> 2). maintain separated KafkaIOs for old versions;
>
> 1) would be easy to maintain, and I assume there should be no issue to use
> Beam-Core 3.0 together with KafkaIO 2.0.
>
> Any thoughts?
>
> Mingmin
>
> On Tue, Apr 2, 2019 at 9:56 AM Reuven Lax  wrote:
>
>> KafkaIO is marked as Experimental, and the comment already warns that 0.9
>> support might be removed. I think that if users still rely on Kafka 0.9 we
>> should leave a fork (renamed) of the IO in the tree for 0.9, but we can
>> definitely remove 0.9 support from the main IO if we want, especially if
>> it's complicated changes to that IO. If we do though, we should fail with a
>> clear error message telling users to use the Kafka 0.9 IO.
>>
>> On Tue, Apr 2, 2019 at 9:34 AM Alexey Romanenko 
>> wrote:
>>
>>> > How are multiple versions of Kafka supported? Are they all in one
>>> client, or is there a case for forks like ElasticSearchIO?
>>>
>>> They are supported in one client but we have additional “ConsumerSpEL”
>>> adapter which unifies interface difference among different Kafka client
>>> versions (mostly to support old ones 0.9-0.10.0).
>>>
>>> On the other hand, we warn user in Javadoc of KafkaIO (which is
>>> Unstable, btw) by the following:
>>> *“KafkaIO relies on kafka-clients for all its interactions with the
>>> Kafka cluster.**kafka-clients versions 0.10.1 and newer are supported
>>> at runtime. The older versions 0.9.x **- 0.10.0.0 are also supported,
>>> but are deprecated and likely be removed in near future.”*
>>>
>>> Despite the fact that, personally, I’d prefer to have only one unified
>>> client interface but, since people still use Beam with old Kafka instances,
>>> we, likely, should stick with it till Beam 3.0.
>>>
>>> WDYT?
>>>
>>> On 2 Apr 2019, at 02:27, Austin Bennett 
>>> wrote:
>>>
>>> FWIW --
>>>
>>> On my (desired, not explicitly job-function) roadmap is to tap into a
>>> bunch of our corporate Kafka queues to ingest that data to places I can
>>> use.  Those are 'stuck' 0.9, with no upgrade in sight (am told the upgrade
>>> path isn't trivial, is very critical flows, and they are scared for it to
>>> break, so it just sits behind firewalls, etc).  But, I wouldn't begin that
>>> for probably at least another quarter.
>>>
>>> I don't contribute to nor understand the burden of maintaining the
>>> support for the older version, so can't reasonably lobby for that continued
>>> pain.
>>>
>>> Anecdotally, this could be a place many enterprises are at (though I
>>> also wonder whether many of the people that would be 'stuck' on such
>>> versions would also have Beam on their current radar).
>>>
>>>
>>> On Mon, Apr 1, 2019 at 2:29 PM Kenneth Knowles  wrote:
>>>
 This could be a backward-incompatible change, though that notion has
 many interpretations. What matters is user pain. Technically if we don't
 break the core SDK, users should be able to use Java SDK >=2.11.0 with
 KafkaIO 2.11.0 forever.

 How are multiple versions of Kafka supported? Are they all in one
 client, or is there a case for forks like ElasticSearchIO?

 Kenn

 On Mon, Apr 1, 2019 at 10:37 AM Jean-Baptiste Onofré 
 wrote:

> +1 to remove 0.9 support.
>
>

Re: [ANNOUNCE] New committer announcement: Raghu Angadi

2019-03-11 Thread Raghu Angadi
Thank you all!

On Mon, Mar 11, 2019 at 6:11 AM Maximilian Michels  wrote:

> Congrats! :)
>
> On 11.03.19 14:01, Etienne Chauchot wrote:
> > Congrats ! Well deserved
> >
> > Etienne
> >
> > Le lundi 11 mars 2019 à 13:22 +0100, Alexey Romanenko a écrit :
> >> My congratulations, Raghu!
> >>
> >>> On 8 Mar 2019, at 10:39, Łukasz Gajowy  >>> <mailto:lukasz.gaj...@gmail.com>> wrote:
> >>>
> >>> Congratulations! :)
> >>>
> >>> pt., 8 mar 2019 o 10:16 Gleb Kanterov  >>> <mailto:g...@spotify.com>> napisał(a):
> >>>> Congratulations!
> >>>>
> >>>> On Thu, Mar 7, 2019 at 11:52 PM Michael Luckey  >>>> <mailto:adude3...@gmail.com>> wrote:
> >>>>> Congrats Raghu!
> >>>>>
> >>>>> On Thu, Mar 7, 2019 at 8:06 PM Mark Liu  >>>>> <mailto:mark...@google.com>> wrote:
> >>>>>> Congrats!
> >>>>>>
> >>>>>> On Thu, Mar 7, 2019 at 10:45 AM Rui Wang  >>>>>> <mailto:ruw...@google.com>> wrote:
> >>>>>>> Congrats Raghu!
> >>>>>>>
> >>>>>>>
> >>>>>>> -Rui
> >>>>>>>
> >>>>>>> On Thu, Mar 7, 2019 at 10:22 AM Thomas Weise  >>>>>>> <mailto:t...@apache.org>> wrote:
> >>>>>>>> Congrats!
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Thu, Mar 7, 2019 at 10:11 AM Tim Robertson
> >>>>>>>> mailto:timrobertson...@gmail.com>>
> >>>>>>>> wrote:
> >>>>>>>>> Congrats Raghu
> >>>>>>>>>
> >>>>>>>>> On Thu, Mar 7, 2019 at 7:09 PM Ahmet Altay  >>>>>>>>> <mailto:al...@google.com>> wrote:
> >>>>>>>>>> Congratulations!
> >>>>>>>>>>
> >>>>>>>>>> On Thu, Mar 7, 2019 at 10:08 AM Ruoyun Huang
> >>>>>>>>>> mailto:ruo...@google.com>> wrote:
> >>>>>>>>>>> Thank you Raghu for your contribution!
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Thu, Mar 7, 2019 at 9:58 AM Connell O'Callaghan
> >>>>>>>>>>> mailto:conne...@google.com>> wrote:
> >>>>>>>>>>>> Congratulation Raghu!!! Thank you for sharing Kenn!!!
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Thu, Mar 7, 2019 at 9:55 AM Ismaël Mejía
> >>>>>>>>>>>> mailto:ieme...@gmail.com>> wrote:
> >>>>>>>>>>>>> Congrats !
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Le jeu. 7 mars 2019 à 17:09, Aizhamal Nurmamat kyzy
> >>>>>>>>>>>>> mailto:aizha...@google.com>> a écrit :
> >>>>>>>>>>>>>> Congratulations, Raghu!!!
> >>>>>>>>>>>>>> On Thu, Mar 7, 2019 at 08:07 Kenneth Knowles
> >>>>>>>>>>>>>> mailto:k...@apache.org>> wrote:
> >>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Please join me and the rest of the Beam PMC in welcoming
> >>>>>>>>>>>>>>> a new committer: Raghu Angadi
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Raghu has been contributing to Beam since early 2016! He
> >>>>>>>>>>>>>>> has continuously improved KafkaIO and supported on the
> >>>>>>>>>>>>>>> user@ list but his community contributions are even more
> >>>>>>>>>>>>>>> extensive, including reviews, dev@ list discussions,
> >>>>>>>>>>>>>>> improvements and ideas across SqsIO, FileIO, PubsubIO,
> >>>>>>>>>>>>>>> and the Dataflow and Samza runners. In consideration of
> >>>>>>>>>>>>>>> Raghu's contributions, the Beam PMC trusts Raghu with the
> >>>>>>>>>>>>>>> responsibilities of a Beam committer [1].
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thank you, Raghu, for your contributions.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Kenn
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>
> https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer
> >>>>>>>>>>>>>>> <
> https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer
> >
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>> 
> >>>>>>>>>>> Ruoyun  Huang
> >>>>>>>>>>>
> >>>>
> >>>>
> >>>> --
> >>>> Cheers,
> >>>> Gleb
> >>
>


Re: KafkaIO Exactly-Once & Flink Runner

2019-03-06 Thread Raghu Angadi
On Wed, Mar 6, 2019 at 9:37 AM Kenneth Knowles  wrote:

> On Tue, Mar 5, 2019 at 10:02 PM Raghu Angadi  wrote:
>
>>
>>
>> On Tue, Mar 5, 2019 at 7:46 AM Reuven Lax  wrote:
>>
>>> RE: Kenn's suggestion. i think Raghu looked into something that, and
>>> something about it didn't work. I don't remember all the details, but I
>>> think there might have been some subtle problem with it that wasn't
>>> obvious. Doesn't mean that there isn't another way to solve that issue.'
>>>
>>
>> Two disadvantages:
>> - A transaction in Kafka are tied to single producer instance. There is
>> no official API to start a txn in one process and access it in another
>> process. Flink's sink uses an internal REST API for this.
>>
>
> Can you say more about how this works?
>

I remember this from discussion PR that added EOS Kafka sink in Flink in
Aug 2017. From the comment at
https://github.com/apache/flink/pull/4239#issuecomment-321392085 :
"Resuming transactions is not a part of KafkaProducer's API, however
Kafka's REST API allows to do that. However I'm aware that it wasn't an
intention of the authors to do so."
Implementation of 'resumeTransaction()' : link
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java#L144>
.

In the case of Flink, this hack is required only when there is a recovery
(since preCommit & postCommit operations run on the same worker in normal
operation). But for Dataflow using GBK, it would have to be handled this
way for every transaction top avoid shuffling the records. A
@RequiresStableInput implementation in Dataflow could actually run pre- and
post- transforms on same worker similar to two fused transforms. In that
sense essentially @RequiesStableInput would be 'fusion break without
shuffle' in Dataflow.

- There is one failure case that I mentioned earlier: if closing the
>> transaction in downstream transform fails, it is data loss, there is no way
>> to replay the upstream transform that wrote the records to Kafka.
>>
>
> With coupling of unrelated failures due to fusion, this is a severe
> problem. I think I see now how 2PC affects this. From my reading, I can't
> see the difference in how Flink works. If the checkpoint finalization
> callback that does the Kafka commit fails, does it invalidate the
> checkpoint so the start transaction + write elements is retried?
>

It doesn't retry since Kafka does not save records, it only save txn
information. 2PC Javadoc
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java#L185>
clearly states that recoverAndCommit() should eventually succeed, otherwise
it is data loss.

In practice committing a txn in Kafka may rarely fail, that's why this
issue does not seem to be an issue for Flink sink.

Overall though, I don't think extra shuffle of messages is prohibitively
expensive and keeps the design clean. In EOS Javadoc, I mentioned bigger
cost could be due to extra serialization & deserialization of the records,
which is reasonably simple to avoid by writing serialized byte-records to
KafkaIO.

Kenn
>
>
>>
>> GBKs don't have major scalability limitations in most runner. Extra GBK
>> is fine in practice for such a sink (at least no one has complained about
>> it yet, though I don't know real usage numbers in practice). Flink's
>> implentation in Beam using @RequiresStableInput  does have storage
>> requirements and latency costs that increase with checkpoint interval. I
>> think is still just as useful. Good to see @RequiresStableInput support
>> added to Flink runner in Max's PR.
>>
>>
>>> Hopefully we can make that work. Another possibility if we can't is to
>>> do something special for Flink. Beam allows runners to splice out
>>> well-known transforms with their own implementation. Dataflow already does
>>> that for Google Cloud Pub/Sub sources/sinks. The Flink runner could splice
>>> out the Kafka sink with one that uses Flink-specific functionality.
>>> Ideally this would reuse most of the existing Kafka code (maybe we could
>>> refactor just the EOS part into something that could be subbed out).
>>>
>>> Reuven
>>>
>>> On Tue, Mar 5, 2019 at 2:53 AM Maximilian Michels 
>>> wrote:
>>>
>>>> > It would be interesting to see if there's something we could add to
>>>> the Beam model that would create a better 

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-05 Thread Raghu Angadi
> protoyping to see how this could be integrated into
>> > Flink. I'm
>> >  >  > currently
>> >  >  > writing a test based on RequiresStableInput.
>> >  >  >
>> >  >  > I found out there are already checks in place at the
>> > Runners to
>> >  >  > throw in
>> >  >  > case transforms use RequiresStableInput and its not
>> >  > supported. However,
>> >  >  > not a single transform actually uses the annotation.
>> >  >  >
>> >  >  > It seems that the effort stopped at some point? Would
>> > it make
>> >  > sense to
>> >  >  > start annotating KafkaExactlyOnceSink with
>> >  > @RequiresStableInput? We
>> >  >  > could then get rid of the whitelist.
>> >  >  >
>> >  >  > -Max
>> >  >  >
>> >  >  > [1]
>> >  >  >
>> >  >
>> >
>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
>> >  >  >
>> >  >  >
>> >  >  >
>> >  >  > On 01.03.19 14:28, Maximilian Michels wrote:
>> >  >  >  > Just realized that transactions do not spawn
>> multiple
>> >  > elements in
>> >  >  >  > KafkaExactlyOnceSink. So the proposed solution to
>> stop
>> >  > processing
>> >  >  >  > elements while a snapshot is pending would work.
>> >  >  >  >
>> >  >  >  > It is certainly not optimal in terms of
>> performance for
>> >  > Flink and
>> >  >  > poses
>> >  >  >  > problems when checkpoints take long to complete,
>> but it
>> >  > would be
>> >  >  >  > worthwhile to implement this to make use of the EOS
>> > feature.
>> >  >  >  >
>> >  >  >  > Thanks,
>> >  >  >  > Max
>> >  >  >  >
>> >  >  >  > On 01.03.19 12:23, Maximilian Michels wrote:
>> >  >  >  >> Thanks you for the prompt replies. It's great to
>> > see that
>> >  > there is
>> >  >  >  >> good understanding of how EOS in Flink works.
>> >  >  >  >>
>> >  >  >  >>> This is exactly what RequiresStableInput is
>> > supposed to
>> >  > do. On the
>> >  >  >  >>> Flink runner, this would be implemented by
>> delaying
>> >  > processing
>> >  >  > until
>> >  >  >  >>> the current checkpoint is done.
>> >  >  >  >>
>> >  >  >  >> I don't think that works because we have no
>> > control over
>> >  > the Kafka
>> >  >  >  >> transactions. Imagine:
>> >  >  >  >>
>> >  >  >  >> 1) ExactlyOnceWriter writes records to Kafka and
>> > commits,
>> >  > then
>> >  >  > starts
>> >  >  >  >> a new transaction.
>> >  >  >  >> 2) Flink checkpoints, delaying the processing of
>> >  > elements, the
>> >  >  >  >> checkpoint fails.
>> >  >  >  >> 3) We restore from an old checkpoint and will
>> > start writing
>> >  >  > duplicate
>> >  >  >  >> data to Kafka. The de-duplication that the sink
>> > performs
>> >  > does not
>> >  >  >  >> help, especially because the random shards ids
>> > might be
>> >  > assigned
>> >  >  >  >> differently.
>> >  >  >  >>
>> >  >  >  >> IMHO we have to have control over commit to be
>> able to
>> >  > provide EOS.
>> >

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-01 Thread Raghu Angadi
On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi  wrote:

> On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles  wrote:
>
>> I'm not sure what a hard fail is. I probably have a shallow
>> understanding, but doesn't @RequiresStableInput work for 2PC? The
>> preCommit() phase should establish the transaction and commit() is not
>> called until after checkpoint finalization. Can you describe the way that
>> it does not work a little bit more?
>>
>
> - preCommit() is called before checkpoint. Kafka EOS in Flink starts the
> transaction before this and makes sure it flushes all records in
> preCommit(). So far good.
> - commit is called after checkpoint is persisted. Now, imagine commit()
> fails for some reason. There is no option to rerun the 1st phase to write
> the records again in a new transaction. This is a hard failure for the the
> job. In practice Flink might attempt to commit again (not sure how many
> times), which is likely to fail and eventually results in job failure.
>

Btw, just to clarify the above failure case is not related to Beam or
@RequiresStableInput.
It is how 2PC in Flink used by it EOS Kafka sync is designed. preCommit()
can't be rerun past the checkpoint, so commit() phase needs to be self
sufficient to handle failures and reruns.



>
>
>> Kenn
>>
>> On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi  wrote:
>>
>>> On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles 
>>> wrote:
>>>
>>>> I believe the way you would implement the logic behind Flink's
>>>> KafkaProducer would be to have two steps:
>>>>
>>>> 1. Start transaction
>>>> 2. @RequiresStableInput Close transaction
>>>>
>>>
>>> I see.  What happens if closing the transaction fails in (2)? Flink's
>>> 2PC requires that commit() should never hard fail once preCommit()
>>> succeeds. I think that is cost of not having an extra shuffle. It is
>>> alright since this policy has worked well for Flink so far.
>>>
>>> Overall, it will be great to have @RequiresStableInput support in Flink
>>> runner.
>>>
>>> Raghu.
>>>
>>>> The FlinkRunner would need to insert the "wait until checkpoint
>>>> finalization" logic wherever it sees @RequiresStableInput, which is already
>>>> what it would have to do.
>>>>
>>>> This matches the KafkaProducer's logic - delay closing the transaction
>>>> until checkpoint finalization. This answers my main question, which is "is
>>>> @RequiresStableInput expressive enough to allow Beam-on-Flink to have
>>>> exactly once behavior with the same performance characteristics as native
>>>> Flink checkpoint finalization?"
>>>>
>>>> Kenn
>>>>
>>>> [1] https://github.com/apache/beam/pull/7955
>>>>
>>>> On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax  wrote:
>>>>
>>>>>
>>>>>
>>>>> On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi 
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>>>>>>> KafkaProducer supports exactly-once. It simply commits the pending
>>>>>>> transaction once it has completed a checkpoint.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I came across KafkaIO's Runner whitelist [1] for enabling
>>>>>>> exactly-once
>>>>>>> semantics (EOS). I think it is questionable to exclude Runners from
>>>>>>> inside a transform, but I see that the intention was to save users
>>>>>>> from
>>>>>>> surprises.
>>>>>>>
>>>>>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's
>>>>>>> native
>>>>>>> KafkaProducer supports exactly-once. It simply commits the pending
>>>>>>> transaction once it has completed a checkpoint.
>>>>>>>
>>>>>>
>>>>>>
>>>>>> When we discussed this in Aug 2017, the understanding was that 2
>>>>>> Phase commit utility in Flink used to implement Flink's Kafka EOS could 
>>>>>> not
>>&g

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-01 Thread Raghu Angadi
Aug 2017, the understanding was that 2
> >  >>> Phase commit utility in Flink used to implement Flink's Kafka
> EOS
> >  >>> could not be implemented in Beam's context.
> >  >>
> >  >> That's also my understanding, unless we change the interface.
> >  >>
> >  >>> I don't see how SDF solves this problem..
> >  >>
> >  >> SDF has a checkpoint method which the Runner can call, but I
> think
> >  >> that you are right, that the above problem would be the same.
> >  >>
> >  >>> Absolutely. I would love to support EOS in KakaIO for Flink. I
> > think
> >  >>> that will help many future exactly-once sinks.. and address
> >  >>> fundamental incompatibility between Beam model and Flink's
> > horizontal
> >  >>> checkpointing for such applications.
> >  >>
> >  >> Great :)
> >  >>
> >  >>> The FlinkRunner would need to insert the "wait until checkpoint
> >  >>> finalization" logic wherever it sees @RequiresStableInput,
> > which is
> >  >>> already what it would have to do.
> >  >>
> >  >> I don't think that fixes the problem. See above example.
> >  >>
> >  >> Thanks,
> >  >> Max
> >  >>
> >  >> On 01.03.19 00:04, Raghu Angadi wrote:
> >  >>>
> >  >>>
> >  >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi  > <mailto:ang...@gmail.com>
> >  >>> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>> wrote:
> >  >>>
> >  >>> On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles
> > mailto:k...@apache.org>
> >  >>> <mailto:k...@apache.org <mailto:k...@apache.org>>> wrote:
> >  >>>
> >  >>> I'm not sure what a hard fail is. I probably have a
> shallow
> >  >>> understanding, but doesn't @RequiresStableInput work
> > for 2PC?
> >  >>> The preCommit() phase should establish the transaction
> and
> >  >>> commit() is not called until after checkpoint
> > finalization. Can
> >  >>> you describe the way that it does not work a little bit
> > more?
> >  >>>
> >  >>>
> >  >>> - preCommit() is called before checkpoint. Kafka EOS in
> > Flink starts
> >  >>> the transaction before this and makes sure it flushes all
> > records in
> >  >>> preCommit(). So far good.
> >  >>> - commit is called after checkpoint is persisted. Now,
> imagine
> >  >>> commit() fails for some reason. There is no option to rerun
> > the 1st
> >  >>> phase to write the records again in a new transaction. This
> > is a
> >  >>> hard failure for the the job. In practice Flink might
> > attempt to
> >  >>> commit again (not sure how many times), which is likely to
> > fail and
> >  >>> eventually results in job failure.
> >  >>>
> >  >>>
> >  >>> In Apache Beam, the records could be stored in state, and can be
> >  >>> written inside commit() to work around this issue. It could have
> >  >>> scalability issues if checkpoints are not frequent enough in
> Flink
> >  >>> runner.
> >  >>>
> >  >>> Raghu.
> >  >>>
> >  >>>
> >  >>>
> >  >>> Kenn
> >  >>>
> >  >>> On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi
> > mailto:ang...@gmail.com>
> >  >>> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
> wrote:
> >  >>>
> >  >>> On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles
> >  >>> mailto:k...@apache.org>
> > <mailto:k...@apache.org <mailto:k...@apache.org>>> wrote:
> >  >>>
> >  >>> I believe the way you would implement the logic
> > behind
> >

Re: KafkaIO Exactly-Once & Flink Runner

2019-02-28 Thread Raghu Angadi
On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi  wrote:

> On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles  wrote:
>
>> I'm not sure what a hard fail is. I probably have a shallow
>> understanding, but doesn't @RequiresStableInput work for 2PC? The
>> preCommit() phase should establish the transaction and commit() is not
>> called until after checkpoint finalization. Can you describe the way that
>> it does not work a little bit more?
>>
>
> - preCommit() is called before checkpoint. Kafka EOS in Flink starts the
> transaction before this and makes sure it flushes all records in
> preCommit(). So far good.
> - commit is called after checkpoint is persisted. Now, imagine commit()
> fails for some reason. There is no option to rerun the 1st phase to write
> the records again in a new transaction. This is a hard failure for the the
> job. In practice Flink might attempt to commit again (not sure how many
> times), which is likely to fail and eventually results in job failure.
>

In Apache Beam, the records could be stored in state, and can be written
inside commit() to work around this issue. It could have scalability issues
if checkpoints are not frequent enough in Flink runner.

Raghu.


>
>
>> Kenn
>>
>> On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi  wrote:
>>
>>> On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles 
>>> wrote:
>>>
>>>> I believe the way you would implement the logic behind Flink's
>>>> KafkaProducer would be to have two steps:
>>>>
>>>> 1. Start transaction
>>>> 2. @RequiresStableInput Close transaction
>>>>
>>>
>>> I see.  What happens if closing the transaction fails in (2)? Flink's
>>> 2PC requires that commit() should never hard fail once preCommit()
>>> succeeds. I think that is cost of not having an extra shuffle. It is
>>> alright since this policy has worked well for Flink so far.
>>>
>>> Overall, it will be great to have @RequiresStableInput support in Flink
>>> runner.
>>>
>>> Raghu.
>>>
>>>> The FlinkRunner would need to insert the "wait until checkpoint
>>>> finalization" logic wherever it sees @RequiresStableInput, which is already
>>>> what it would have to do.
>>>>
>>>> This matches the KafkaProducer's logic - delay closing the transaction
>>>> until checkpoint finalization. This answers my main question, which is "is
>>>> @RequiresStableInput expressive enough to allow Beam-on-Flink to have
>>>> exactly once behavior with the same performance characteristics as native
>>>> Flink checkpoint finalization?"
>>>>
>>>> Kenn
>>>>
>>>> [1] https://github.com/apache/beam/pull/7955
>>>>
>>>> On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax  wrote:
>>>>
>>>>>
>>>>>
>>>>> On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi 
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>>>>>>> KafkaProducer supports exactly-once. It simply commits the pending
>>>>>>> transaction once it has completed a checkpoint.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I came across KafkaIO's Runner whitelist [1] for enabling
>>>>>>> exactly-once
>>>>>>> semantics (EOS). I think it is questionable to exclude Runners from
>>>>>>> inside a transform, but I see that the intention was to save users
>>>>>>> from
>>>>>>> surprises.
>>>>>>>
>>>>>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's
>>>>>>> native
>>>>>>> KafkaProducer supports exactly-once. It simply commits the pending
>>>>>>> transaction once it has completed a checkpoint.
>>>>>>>
>>>>>>
>>>>>>
>>>>>> When we discussed this in Aug 2017, the understanding was that 2
>>>>>> Phase commit utility in Flink used to implement Flink's Kafka EOS could 
>>>>>> not
>>>>>> be implemented in Beam's context.
>>>>>

Re: KafkaIO Exactly-Once & Flink Runner

2019-02-28 Thread Raghu Angadi
On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles  wrote:

> I'm not sure what a hard fail is. I probably have a shallow understanding,
> but doesn't @RequiresStableInput work for 2PC? The preCommit() phase should
> establish the transaction and commit() is not called until after checkpoint
> finalization. Can you describe the way that it does not work a little bit
> more?
>

- preCommit() is called before checkpoint. Kafka EOS in Flink starts the
transaction before this and makes sure it flushes all records in
preCommit(). So far good.
- commit is called after checkpoint is persisted. Now, imagine commit()
fails for some reason. There is no option to rerun the 1st phase to write
the records again in a new transaction. This is a hard failure for the the
job. In practice Flink might attempt to commit again (not sure how many
times), which is likely to fail and eventually results in job failure.


> Kenn
>
> On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi  wrote:
>
>> On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles  wrote:
>>
>>> I believe the way you would implement the logic behind Flink's
>>> KafkaProducer would be to have two steps:
>>>
>>> 1. Start transaction
>>> 2. @RequiresStableInput Close transaction
>>>
>>
>> I see.  What happens if closing the transaction fails in (2)? Flink's 2PC
>> requires that commit() should never hard fail once preCommit() succeeds. I
>> think that is cost of not having an extra shuffle. It is alright since this
>> policy has worked well for Flink so far.
>>
>> Overall, it will be great to have @RequiresStableInput support in Flink
>> runner.
>>
>> Raghu.
>>
>>> The FlinkRunner would need to insert the "wait until checkpoint
>>> finalization" logic wherever it sees @RequiresStableInput, which is already
>>> what it would have to do.
>>>
>>> This matches the KafkaProducer's logic - delay closing the transaction
>>> until checkpoint finalization. This answers my main question, which is "is
>>> @RequiresStableInput expressive enough to allow Beam-on-Flink to have
>>> exactly once behavior with the same performance characteristics as native
>>> Flink checkpoint finalization?"
>>>
>>> Kenn
>>>
>>> [1] https://github.com/apache/beam/pull/7955
>>>
>>> On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax  wrote:
>>>
>>>>
>>>>
>>>> On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi  wrote:
>>>>
>>>>>
>>>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>>>>>> KafkaProducer supports exactly-once. It simply commits the pending
>>>>>> transaction once it has completed a checkpoint.
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels 
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I came across KafkaIO's Runner whitelist [1] for enabling
>>>>>> exactly-once
>>>>>> semantics (EOS). I think it is questionable to exclude Runners from
>>>>>> inside a transform, but I see that the intention was to save users
>>>>>> from
>>>>>> surprises.
>>>>>>
>>>>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>>>>>> KafkaProducer supports exactly-once. It simply commits the pending
>>>>>> transaction once it has completed a checkpoint.
>>>>>>
>>>>>
>>>>>
>>>>> When we discussed this in Aug 2017, the understanding was that 2 Phase
>>>>> commit utility in Flink used to implement Flink's Kafka EOS could not be
>>>>> implemented in Beam's context.
>>>>> See  this message
>>>>> <https://www.mail-archive.com/dev@beam.apache.org/msg02664.html> in
>>>>> that dev thread. Has anything changed in this regard? The whole thread is
>>>>> relevant to this topic and worth going through.
>>>>>
>>>>
>>>> I think that TwoPhaseCommit utility class wouldn't work. The Flink
>>>> runner would probably want to directly use notifySnapshotComplete in order
>>>> to implement @RequiresStableInput.
>>>>
>>>>>
>>>>>
>>>>>>
>>>>>> A checkpoint is realized by sending barriers through all channels
>>

Re: KafkaIO Exactly-Once & Flink Runner

2019-02-28 Thread Raghu Angadi
On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles  wrote:

> I believe the way you would implement the logic behind Flink's
> KafkaProducer would be to have two steps:
>
> 1. Start transaction
> 2. @RequiresStableInput Close transaction
>

I see.  What happens if closing the transaction fails in (2)? Flink's 2PC
requires that commit() should never hard fail once preCommit() succeeds. I
think that is cost of not having an extra shuffle. It is alright since this
policy has worked well for Flink so far.

Overall, it will be great to have @RequiresStableInput support in Flink
runner.

Raghu.

> The FlinkRunner would need to insert the "wait until checkpoint
> finalization" logic wherever it sees @RequiresStableInput, which is already
> what it would have to do.
>
> This matches the KafkaProducer's logic - delay closing the transaction
> until checkpoint finalization. This answers my main question, which is "is
> @RequiresStableInput expressive enough to allow Beam-on-Flink to have
> exactly once behavior with the same performance characteristics as native
> Flink checkpoint finalization?"
>
> Kenn
>
> [1] https://github.com/apache/beam/pull/7955
>
> On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax  wrote:
>
>>
>>
>> On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi  wrote:
>>
>>>
>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>>>> KafkaProducer supports exactly-once. It simply commits the pending
>>>> transaction once it has completed a checkpoint.
>>>
>>>
>>>
>>> On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I came across KafkaIO's Runner whitelist [1] for enabling exactly-once
>>>> semantics (EOS). I think it is questionable to exclude Runners from
>>>> inside a transform, but I see that the intention was to save users from
>>>> surprises.
>>>>
>>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>>>> KafkaProducer supports exactly-once. It simply commits the pending
>>>> transaction once it has completed a checkpoint.
>>>>
>>>
>>>
>>> When we discussed this in Aug 2017, the understanding was that 2 Phase
>>> commit utility in Flink used to implement Flink's Kafka EOS could not be
>>> implemented in Beam's context.
>>> See  this message
>>> <https://www.mail-archive.com/dev@beam.apache.org/msg02664.html> in
>>> that dev thread. Has anything changed in this regard? The whole thread is
>>> relevant to this topic and worth going through.
>>>
>>
>> I think that TwoPhaseCommit utility class wouldn't work. The Flink runner
>> would probably want to directly use notifySnapshotComplete in order to
>> implement @RequiresStableInput.
>>
>>>
>>>
>>>>
>>>> A checkpoint is realized by sending barriers through all channels
>>>> starting from the source until reaching all sinks. Every operator
>>>> persists its state once it has received a barrier on all its input
>>>> channels, it then forwards it to the downstream operators.
>>>>
>>>> The architecture of Beam's KafkaExactlyOnceSink is as follows[2]:
>>>>
>>>> Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds ->
>>>> GroupByKey -> ExactlyOnceWriter
>>>>
>>>> As I understood, Spark or Dataflow use the GroupByKey stages to persist
>>>> the input. That is not required in Flink to be able to take a
>>>> consistent
>>>> snapshot of the pipeline.
>>>>
>>>> Basically, for Flink we don't need any of that magic that KafkaIO does.
>>>> What we would need to support EOS is a way to tell the
>>>> ExactlyOnceWriter
>>>> (a DoFn) to commit once a checkpoint has completed.
>>>
>>> I know that the new version of SDF supports checkpointing which should
>>>> solve this issue. But there is still a lot of work to do to make this
>>>> reality.
>>>>
>>>
>>> I don't see how SDF solves this problem.. May be pseudo code would make
>>> more clear.  But if helps, that is great!
>>>
>>> So I think it would make sense to think about a way to make KafkaIO's
>>>> EOS more accessible to Runners which support a different way of
>>>> checkpointing.
>>>>
>>>
>>> Absolutely. I would love to support EOS in KakaIO for Flink. I think
>>> that will help many future exactly-once sinks.. and address fundamental
>>> incompatibility between Beam model and Flink's horizontal checkpointing for
>>> such applications.
>>>
>>> Raghu.
>>>
>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> PS: I found this document about RequiresStableInput [3], but IMHO
>>>> defining an annotation only manifests the conceptual difference between
>>>> the Runners.
>>>>
>>>>
>>>> [1]
>>>>
>>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
>>>> [2]
>>>>
>>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
>>>> [3]
>>>>
>>>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
>>>>
>>>


Re: KafkaIO Exactly-Once & Flink Runner

2019-02-28 Thread Raghu Angadi
> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
> KafkaProducer supports exactly-once. It simply commits the pending
> transaction once it has completed a checkpoint.



On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels  wrote:

> Hi,
>
> I came across KafkaIO's Runner whitelist [1] for enabling exactly-once
> semantics (EOS). I think it is questionable to exclude Runners from
> inside a transform, but I see that the intention was to save users from
> surprises.
>
> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
> KafkaProducer supports exactly-once. It simply commits the pending
> transaction once it has completed a checkpoint.
>


When we discussed this in Aug 2017, the understanding was that 2 Phase
commit utility in Flink used to implement Flink's Kafka EOS could not be
implemented in Beam's context.
See  this message
 in that
dev thread. Has anything changed in this regard? The whole thread is
relevant to this topic and worth going through.


>
> A checkpoint is realized by sending barriers through all channels
> starting from the source until reaching all sinks. Every operator
> persists its state once it has received a barrier on all its input
> channels, it then forwards it to the downstream operators.
>
> The architecture of Beam's KafkaExactlyOnceSink is as follows[2]:
>
> Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds ->
> GroupByKey -> ExactlyOnceWriter
>
> As I understood, Spark or Dataflow use the GroupByKey stages to persist
> the input. That is not required in Flink to be able to take a consistent
> snapshot of the pipeline.
>
> Basically, for Flink we don't need any of that magic that KafkaIO does.
> What we would need to support EOS is a way to tell the ExactlyOnceWriter
> (a DoFn) to commit once a checkpoint has completed.

I know that the new version of SDF supports checkpointing which should
> solve this issue. But there is still a lot of work to do to make this
> reality.
>

I don't see how SDF solves this problem.. May be pseudo code would make
more clear.  But if helps, that is great!

So I think it would make sense to think about a way to make KafkaIO's
> EOS more accessible to Runners which support a different way of
> checkpointing.
>

Absolutely. I would love to support EOS in KakaIO for Flink. I think that
will help many future exactly-once sinks.. and address fundamental
incompatibility between Beam model and Flink's horizontal checkpointing for
such applications.

Raghu.


> Cheers,
> Max
>
> PS: I found this document about RequiresStableInput [3], but IMHO
> defining an annotation only manifests the conceptual difference between
> the Runners.
>
>
> [1]
>
> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
> [2]
>
> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
> [3]
>
> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
>


Dev contact for KafkaIO

2019-01-09 Thread Raghu Angadi
Hi Everyone,

Last Friday was my last day at Google and Dataflow team. I had a great time
and learnt lot from working on both Apache Beam and Dataflow.

My new job at a startup is not directly related to Apache Beam and I will
not be able to spend a lot of time on support or development of KafkaIO.

Alexey Romanenko has been quite active in Apache Beam community and I had
many productive interactions with him on KafkaIO pull requests. I think he
will be a great lead to take KafkaIO forward. Many of you made contributors
to KafkaIO and I am sure you will continue to do so.

I have been a contributor for Apache projects for a long, and I do think it
is important for us to support end users as best as we can. I will be
following Apache Beam developments and will be available for questions and
reviews of pull requests or feature discussions.

Thanks,
Raghu.


Re: KafkaIO - Deadletter output

2018-10-25 Thread Raghu Angadi
On Thu, Oct 25, 2018 at 10:47 AM Chamikara Jayalath 
wrote:

>
>
> On Thu, Oct 25, 2018 at 10:41 AM Raghu Angadi  wrote:
>
>>
>> On Thu, Oct 25, 2018 at 10:28 AM Chamikara Jayalath 
>> wrote:
>>
>>> Not sure if I understand why this would require Kafka to behave as two
>>> independent sources.
>>>
>>
>>
>>
>>> Won't setting a non-negative-infinity timestamp (for example processing
>>> time) for failed records be enough ?
>>>
>>
>> Well that's what I suggested and that is what a user seems to have done.
>> The question what if that is not what we want and don't want to mix these
>> two together (at least from my reading of Luke's and Kenn's comments, which
>> could be off).
>>
>
> Sorry I meant setting it at the SDF itself (not from a ParDo) when we have
> SDF-based KafkaIO.
>

You can set the timestamp even now, it does not need KafkaIO to be ported
to SDF. It was always possible to do that in KafkaIO. See Jozef Vilcek's
message above.


>
>
>>
>>
>
>>
>>>
>>> Also (at least at some point) there were discussions on supporting SDFs
>>> to report different watermarks for different outputs. More details are
>>> available here:
>>> https://docs.google.com/document/d/1BGc8pM1GOvZhwR9SARSVte-20XEoBUxrGJ5gTWXdv3c/edit
>>>
>>> - Cham
>>>
>>>
>>>>
>>>> Raghu.
>>>>
>>>>>
>>>>> It could even assign a timestamp that makes more logical sense in a
>>>>>> particular application.
>>>>>>
>>>>>> On Wed, Oct 24, 2018 at 8:30 PM Kenneth Knowles 
>>>>>> wrote:
>>>>>>
>>>>>>> Forgive me if this is naive or missing something, but here are my
>>>>>>> thoughts on these alternatives:
>>>>>>>
>>>>>>> (0) Timestamp has to be pulled out in the source to control the
>>>>>>> watermark. Luke's point is imortant.
>>>>>>>
>>>>>>> (1) If bad records get min_timestamp, and they occur infrequently
>>>>>>> enough, then watermark will advance and they will all be dropped. That 
>>>>>>> will
>>>>>>> not allow output to a dead-letter queue.
>>>>>>>
>>>>>>> (2) If you have always min_timestamp records, or if bad records are
>>>>>>> frequent, the watermark will never advance. So windows/aggregations 
>>>>>>> would
>>>>>>> never be considered complete. Triggers could be used to get output 
>>>>>>> anyhow,
>>>>>>> but it would never be a final answer. I think it is not in the spirit of
>>>>>>> Beam to work this way. Pragmatically, no state could ever be freed by a
>>>>>>> runner.
>>>>>>>
>>>>>>> In SQL there is an actual "dead letter" option when creating a table
>>>>>>> that parses from a bytes source. If, for example, a JSON record cannot 
>>>>>>> be
>>>>>>> parsed to the expected schema - like maybe an avro record got in the
>>>>>>> stream, or the JSON doesn't match the expected schema - it is output 
>>>>>>> as-is
>>>>>>> to a user-specified dead letter queue. I think this same level of 
>>>>>>> support
>>>>>>> is also required for records that cannot have timestamps extracted in an
>>>>>>> unbounded source.
>>>>>>>
>>>>>>> In an SDF I think the function has enough control to do it all in
>>>>>>> "userland", so Cham is right on here.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Wed, Oct 24, 2018 at 6:54 PM Lukasz Cwik 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> That depends on the users pipeline and how watermark advancement of
>>>>>>>> the source may impact elements becoming droppably late if they are 
>>>>>>>> emitted
>>>>>>>> with the minimum timestamp.
>>>>>>>>
>>>>>>>> On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I see.
>

Re: KafkaIO - Deadletter output

2018-10-25 Thread Raghu Angadi
On Wed, Oct 24, 2018 at 11:54 PM Reuven Lax  wrote:
[...]

> KafkaIO has a few in-built policies for watermark and timestamp that cover
>> most use cases (including server time, which has a benefit of providing
>> perfect watermark). It also gives fairly complete control on these to users
>> if they chose to. I think it looks like reasonable for a policy to base its
>> watermark only only on parsable records, and ignore unparsable records
>> w.r.t watermark calculation.
>>
>
> But then doesn't that force the user to set max allowed lateness to
> infinity, otherwise these records will be dropped?
>

True, especially if there are any aggregations needed to be performed on
dead-letter output. I think I understand issue better now. Can it be stated
something like this:

What we ideally want is two independent sources, one for the main source,
and one for the deadletter output, each with its own timestamp and
watermark characteristics.

If so, yes, I don't think it is feasible with UnboundedSource API. How does
SDF enable this without special features from Beam?

Raghu.

>
> It could even assign a timestamp that makes more logical sense in a
>> particular application.
>>
>> On Wed, Oct 24, 2018 at 8:30 PM Kenneth Knowles  wrote:
>>
>>> Forgive me if this is naive or missing something, but here are my
>>> thoughts on these alternatives:
>>>
>>> (0) Timestamp has to be pulled out in the source to control the
>>> watermark. Luke's point is imortant.
>>>
>>> (1) If bad records get min_timestamp, and they occur infrequently
>>> enough, then watermark will advance and they will all be dropped. That will
>>> not allow output to a dead-letter queue.
>>>
>>> (2) If you have always min_timestamp records, or if bad records are
>>> frequent, the watermark will never advance. So windows/aggregations would
>>> never be considered complete. Triggers could be used to get output anyhow,
>>> but it would never be a final answer. I think it is not in the spirit of
>>> Beam to work this way. Pragmatically, no state could ever be freed by a
>>> runner.
>>>
>>> In SQL there is an actual "dead letter" option when creating a table
>>> that parses from a bytes source. If, for example, a JSON record cannot be
>>> parsed to the expected schema - like maybe an avro record got in the
>>> stream, or the JSON doesn't match the expected schema - it is output as-is
>>> to a user-specified dead letter queue. I think this same level of support
>>> is also required for records that cannot have timestamps extracted in an
>>> unbounded source.
>>>
>>> In an SDF I think the function has enough control to do it all in
>>> "userland", so Cham is right on here.
>>>
>>> Kenn
>>>
>>> On Wed, Oct 24, 2018 at 6:54 PM Lukasz Cwik  wrote:
>>>
>>>> That depends on the users pipeline and how watermark advancement of the
>>>> source may impact elements becoming droppably late if they are emitted with
>>>> the minimum timestamp.
>>>>
>>>> On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi 
>>>> wrote:
>>>>
>>>>> I see.
>>>>>
>>>>> What I meant was to return min_timestamp for bad records in the
>>>>> timestamp handler passed to KafkaIO itself, and correct timestamp for
>>>>> parsable records. That should work too, right?
>>>>>
>>>>> On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik  wrote:
>>>>>
>>>>>> Yes, that would be fine.
>>>>>>
>>>>>> The user could then use a ParDo which outputs to a DLQ for things it
>>>>>> can't parse the timestamp for and use outputWithTimestamp[1] for 
>>>>>> everything
>>>>>> else.
>>>>>>
>>>>>> 1:
>>>>>> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant-
>>>>>>
>>>>>> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi 
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks. So returning  min timestamp is OK, right (assuming
>>>>>>> application fine is with what it means)?
>>>>>>>
>>>>>>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik 
>>>>>>> wrote:
>>>>>>>
>>>>>&g

Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
I see.

What I meant was to return min_timestamp for bad records in the timestamp
handler passed to KafkaIO itself, and correct timestamp for parsable
records. That should work too, right?

On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik  wrote:

> Yes, that would be fine.
>
> The user could then use a ParDo which outputs to a DLQ for things it can't
> parse the timestamp for and use outputWithTimestamp[1] for everything else.
>
> 1:
> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant-
>
> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi  wrote:
>
>> Thanks. So returning  min timestamp is OK, right (assuming application
>> fine is with what it means)?
>>
>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik  wrote:
>>
>>> All records in Apache Beam have a timestamp. The default timestamp is
>>> the min timestamp defined here:
>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
>>>
>>>
>>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi 
>>> wrote:
>>>
>>>>
>>>>
>>>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik  wrote:
>>>>
>>>>> You would have to return min timestamp for all records otherwise the
>>>>> watermark may have advanced and you would be outputting records that are
>>>>> droppably late.
>>>>>
>>>>
>>>> That would be fine I guess. What’s the timestamp for a record that
>>>> doesn’t have one?
>>>>
>>>>
>>>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi 
>>>>> wrote:
>>>>>
>>>>>> To be clear, returning min_timestamp for unparsable records shound
>>>>>> not affect the watermark.
>>>>>>
>>>>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi 
>>>>>> wrote:
>>>>>>
>>>>>>> How about returning min_timestamp? The would be dropped or
>>>>>>> redirected by the ParDo after that.
>>>>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API,
>>>>>>> is this pipeline defined under kafkaio package?
>>>>>>>
>>>>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> In this case, the user is attempting to handle errors when parsing
>>>>>>>> the timestamp. The timestamp controls the watermark for the
>>>>>>>> UnboundedSource, how would they control the watermark in a downstream 
>>>>>>>> ParDo?
>>>>>>>>
>>>>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>>>>>>>>> chamik...@google.com> wrote:
>>>>>>>>>
>>>>>>>>>> Ah nice. Yeah, if user can return full bytes instead of applying
>>>>>>>>>> a function that would result in an exception,  this can be extracted 
>>>>>>>>>> by a
>>>>>>>>>> ParDo down the line.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> KafkaIO does return bytes, and I think most sources should, unless
>>>>>>>>> there is a good reason not to.
>>>>>>>>> Given that, do we think Beam should provide a tranform that makes
>>>>>>>>> to simpler to handle deadletter output? I think there was a thread 
>>>>>>>>> about it
>>>>>>>>> in the past.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>>>>>>>>>> jcgarc...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> As Raghu said,
>>>>>>>>>>>
>>>>>>>>>>> Just apply a regular ParDo and return a PCollectionTuple afert
>>>>>>>>>>> that you ca

Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
Thanks. So returning  min timestamp is OK, right (assuming application fine
is with what it means)?

On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik  wrote:

> All records in Apache Beam have a timestamp. The default timestamp is the
> min timestamp defined here:
> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
>
>
> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi  wrote:
>
>>
>>
>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik  wrote:
>>
>>> You would have to return min timestamp for all records otherwise the
>>> watermark may have advanced and you would be outputting records that are
>>> droppably late.
>>>
>>
>> That would be fine I guess. What’s the timestamp for a record that
>> doesn’t have one?
>>
>>
>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi 
>>> wrote:
>>>
>>>> To be clear, returning min_timestamp for unparsable records shound not
>>>> affect the watermark.
>>>>
>>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi 
>>>> wrote:
>>>>
>>>>> How about returning min_timestamp? The would be dropped or redirected
>>>>> by the ParDo after that.
>>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is
>>>>> this pipeline defined under kafkaio package?
>>>>>
>>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik  wrote:
>>>>>
>>>>>> In this case, the user is attempting to handle errors when parsing
>>>>>> the timestamp. The timestamp controls the watermark for the
>>>>>> UnboundedSource, how would they control the watermark in a downstream 
>>>>>> ParDo?
>>>>>>
>>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi 
>>>>>> wrote:
>>>>>>
>>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>>>>>>> chamik...@google.com> wrote:
>>>>>>>
>>>>>>>> Ah nice. Yeah, if user can return full bytes instead of applying a
>>>>>>>> function that would result in an exception,  this can be extracted by a
>>>>>>>> ParDo down the line.
>>>>>>>>
>>>>>>>
>>>>>>> KafkaIO does return bytes, and I think most sources should, unless
>>>>>>> there is a good reason not to.
>>>>>>> Given that, do we think Beam should provide a tranform that makes to
>>>>>>> simpler to handle deadletter output? I think there was a thread about 
>>>>>>> it in
>>>>>>> the past.
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>>>>>>>> jcgarc...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> As Raghu said,
>>>>>>>>>
>>>>>>>>> Just apply a regular ParDo and return a PCollectionTuple afert
>>>>>>>>> that you can extract your Success Records (TupleTag) and your 
>>>>>>>>> DeadLetter
>>>>>>>>> records(TupleTag) and do whatever you want with them.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Raghu Angadi  schrieb am Mi., 24. Okt. 2018,
>>>>>>>>> 05:18:
>>>>>>>>>
>>>>>>>>>> User can read serialized bytes from KafkaIO and deserialize
>>>>>>>>>> explicitly in a ParDo, which gives complete control on how to handle 
>>>>>>>>>> record
>>>>>>>>>> errors. This is I would do if I need to in my pipeline.
>>>>>>>>>>
>>>>>>>>>> If there is a transform in Beam that does this, it could be
>>>>>>>>>> convenient for users in many such scenarios. This is simpler than 
>>>>>>>>>> each
>>>>>>>>>> source supporting it explicitly.
>>>>>>>>>>
>>>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>>>>>>>>> chamik...@google.com> wrote:
>>>>>>>&

Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik  wrote:

> You would have to return min timestamp for all records otherwise the
> watermark may have advanced and you would be outputting records that are
> droppably late.
>

That would be fine I guess. What’s the timestamp for a record that doesn’t
have one?


> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi  wrote:
>
>> To be clear, returning min_timestamp for unparsable records shound not
>> affect the watermark.
>>
>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi  wrote:
>>
>>> How about returning min_timestamp? The would be dropped or redirected by
>>> the ParDo after that.
>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is
>>> this pipeline defined under kafkaio package?
>>>
>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik  wrote:
>>>
>>>> In this case, the user is attempting to handle errors when parsing the
>>>> timestamp. The timestamp controls the watermark for the UnboundedSource,
>>>> how would they control the watermark in a downstream ParDo?
>>>>
>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi 
>>>> wrote:
>>>>
>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>>>>> chamik...@google.com> wrote:
>>>>>
>>>>>> Ah nice. Yeah, if user can return full bytes instead of applying a
>>>>>> function that would result in an exception,  this can be extracted by a
>>>>>> ParDo down the line.
>>>>>>
>>>>>
>>>>> KafkaIO does return bytes, and I think most sources should, unless
>>>>> there is a good reason not to.
>>>>> Given that, do we think Beam should provide a tranform that makes to
>>>>> simpler to handle deadletter output? I think there was a thread about it 
>>>>> in
>>>>> the past.
>>>>>
>>>>>
>>>>>>
>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>>>>>> jcgarc...@gmail.com> wrote:
>>>>>>
>>>>>>> As Raghu said,
>>>>>>>
>>>>>>> Just apply a regular ParDo and return a PCollectionTuple afert that
>>>>>>> you can extract your Success Records (TupleTag) and your DeadLetter
>>>>>>> records(TupleTag) and do whatever you want with them.
>>>>>>>
>>>>>>>
>>>>>>> Raghu Angadi  schrieb am Mi., 24. Okt. 2018,
>>>>>>> 05:18:
>>>>>>>
>>>>>>>> User can read serialized bytes from KafkaIO and deserialize
>>>>>>>> explicitly in a ParDo, which gives complete control on how to handle 
>>>>>>>> record
>>>>>>>> errors. This is I would do if I need to in my pipeline.
>>>>>>>>
>>>>>>>> If there is a transform in Beam that does this, it could be
>>>>>>>> convenient for users in many such scenarios. This is simpler than each
>>>>>>>> source supporting it explicitly.
>>>>>>>>
>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>>>>>>> chamik...@google.com> wrote:
>>>>>>>>
>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this is probably
>>>>>>>>> not something that can easily be supported. We might be able to 
>>>>>>>>> support
>>>>>>>>> similar features when we have Kafka on top of Splittable DoFn though.
>>>>>>>>>
>>>>>>>> So feel free to create a feature request JIRA for this.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Cham
>>>>>>>>>
>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> This is a great question. I've added the dev list to be sure it
>>>>>>>>>> gets noticed by whoever may know best.
>>>>>>>>>>
>>>>>>>>>> Kenn
>>>>>>>>>>
>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>&g

Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
To be clear, returning min_timestamp for unparsable records shound not
affect the watermark.

On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi  wrote:

> How about returning min_timestamp? The would be dropped or redirected by
> the ParDo after that.
> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is
> this pipeline defined under kafkaio package?
>
> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik  wrote:
>
>> In this case, the user is attempting to handle errors when parsing the
>> timestamp. The timestamp controls the watermark for the UnboundedSource,
>> how would they control the watermark in a downstream ParDo?
>>
>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi  wrote:
>>
>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath 
>>> wrote:
>>>
>>>> Ah nice. Yeah, if user can return full bytes instead of applying a
>>>> function that would result in an exception,  this can be extracted by a
>>>> ParDo down the line.
>>>>
>>>
>>> KafkaIO does return bytes, and I think most sources should, unless there
>>> is a good reason not to.
>>> Given that, do we think Beam should provide a tranform that makes to
>>> simpler to handle deadletter output? I think there was a thread about it in
>>> the past.
>>>
>>>
>>>>
>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>>>> jcgarc...@gmail.com> wrote:
>>>>
>>>>> As Raghu said,
>>>>>
>>>>> Just apply a regular ParDo and return a PCollectionTuple afert that
>>>>> you can extract your Success Records (TupleTag) and your DeadLetter
>>>>> records(TupleTag) and do whatever you want with them.
>>>>>
>>>>>
>>>>> Raghu Angadi  schrieb am Mi., 24. Okt. 2018,
>>>>> 05:18:
>>>>>
>>>>>> User can read serialized bytes from KafkaIO and deserialize
>>>>>> explicitly in a ParDo, which gives complete control on how to handle 
>>>>>> record
>>>>>> errors. This is I would do if I need to in my pipeline.
>>>>>>
>>>>>> If there is a transform in Beam that does this, it could be
>>>>>> convenient for users in many such scenarios. This is simpler than each
>>>>>> source supporting it explicitly.
>>>>>>
>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>>>>> chamik...@google.com> wrote:
>>>>>>
>>>>>>> Given that KafkaIO uses UnboundeSource framework, this is probably
>>>>>>> not something that can easily be supported. We might be able to support
>>>>>>> similar features when we have Kafka on top of Splittable DoFn though.
>>>>>>>
>>>>>> So feel free to create a feature request JIRA for this.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Cham
>>>>>>>
>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> This is a great question. I've added the dev list to be sure it
>>>>>>>> gets noticed by whoever may know best.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>>>>>>> tobias.kay...@ricardo.ch> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Is there a way to get a Deadletter Output from a pipeline that
>>>>>>>>> uses a KafkaIO
>>>>>>>>> connector for it's input? As
>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>>>>>>>> only a SerializableFunction and not a ParDo, how would I be able
>>>>>>>>> to produce a
>>>>>>>>> Deadletter output from it?
>>>>>>>>>
>>>>>>>>> I have the following pipeline defined that reads from a KafkaIO
>>>>>>>>> input:
>>>>>>>>>
>>>>>>>>> pipeline.apply(
>>>>>>>>>   KafkaIO.read()
>>>>>>>>> .withBootstrapServers(bootstrap)
>>>>>>>>> .withTopics(topics)
>>>>>>>>> .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>>> .updateConsumerProperties(
>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>>>> inputMessagesConfig))
>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>>>>>>>>> "earliest"))
>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("group.id",
>>>>>>>>> "beam-consumers"))
>>>>>>>>>
>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", 
>>>>>>>>> "true"))
>>>>>>>>> .withTimestampPolicyFactory(
>>>>>>>>> TimestampPolicyFactory.withTimestampFn(
>>>>>>>>> new MessageTimestampExtractor(inputMessagesConfig)))
>>>>>>>>> .withReadCommitted()
>>>>>>>>> .commitOffsetsInFinalize())
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> and I like to get deadletter outputs when my timestamp extraction
>>>>>>>>> fails.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Tobi
>>>>>>>>>
>>>>>>>>>


Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
How about returning min_timestamp? The would be dropped or redirected by
the ParDo after that.
Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is this
pipeline defined under kafkaio package?

On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik  wrote:

> In this case, the user is attempting to handle errors when parsing the
> timestamp. The timestamp controls the watermark for the UnboundedSource,
> how would they control the watermark in a downstream ParDo?
>
> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi  wrote:
>
>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath 
>> wrote:
>>
>>> Ah nice. Yeah, if user can return full bytes instead of applying a
>>> function that would result in an exception,  this can be extracted by a
>>> ParDo down the line.
>>>
>>
>> KafkaIO does return bytes, and I think most sources should, unless there
>> is a good reason not to.
>> Given that, do we think Beam should provide a tranform that makes to
>> simpler to handle deadletter output? I think there was a thread about it in
>> the past.
>>
>>
>>>
>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia 
>>> wrote:
>>>
>>>> As Raghu said,
>>>>
>>>> Just apply a regular ParDo and return a PCollectionTuple afert that you
>>>> can extract your Success Records (TupleTag) and your DeadLetter
>>>> records(TupleTag) and do whatever you want with them.
>>>>
>>>>
>>>> Raghu Angadi  schrieb am Mi., 24. Okt. 2018, 05:18:
>>>>
>>>>> User can read serialized bytes from KafkaIO and deserialize explicitly
>>>>> in a ParDo, which gives complete control on how to handle record errors.
>>>>> This is I would do if I need to in my pipeline.
>>>>>
>>>>> If there is a transform in Beam that does this, it could be convenient
>>>>> for users in many such scenarios. This is simpler than each source
>>>>> supporting it explicitly.
>>>>>
>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>>>> chamik...@google.com> wrote:
>>>>>
>>>>>> Given that KafkaIO uses UnboundeSource framework, this is probably
>>>>>> not something that can easily be supported. We might be able to support
>>>>>> similar features when we have Kafka on top of Splittable DoFn though.
>>>>>>
>>>>> So feel free to create a feature request JIRA for this.
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles 
>>>>>> wrote:
>>>>>>
>>>>>>> This is a great question. I've added the dev list to be sure it gets
>>>>>>> noticed by whoever may know best.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>>>>>> tobias.kay...@ricardo.ch> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Is there a way to get a Deadletter Output from a pipeline that uses
>>>>>>>> a KafkaIO
>>>>>>>> connector for it's input? As
>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>>>>>>> only a SerializableFunction and not a ParDo, how would I be able to
>>>>>>>> produce a
>>>>>>>> Deadletter output from it?
>>>>>>>>
>>>>>>>> I have the following pipeline defined that reads from a KafkaIO
>>>>>>>> input:
>>>>>>>>
>>>>>>>> pipeline.apply(
>>>>>>>>   KafkaIO.read()
>>>>>>>> .withBootstrapServers(bootstrap)
>>>>>>>> .withTopics(topics)
>>>>>>>> .withKeyDeserializer(StringDeserializer.class)
>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>> .updateConsumerProperties(
>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>>> inputMessagesConfig))
>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>>>>>>>> "earliest"))
>>>>>>>> .updateConsumerProperties(ImmutableMap.of("group.id",
>>>>>>>> "beam-consumers"))
>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
>>>>>>>> "true"))
>>>>>>>> .withTimestampPolicyFactory(
>>>>>>>> TimestampPolicyFactory.withTimestampFn(
>>>>>>>> new MessageTimestampExtractor(inputMessagesConfig)))
>>>>>>>> .withReadCommitted()
>>>>>>>> .commitOffsetsInFinalize())
>>>>>>>>
>>>>>>>>
>>>>>>>> and I like to get deadletter outputs when my timestamp extraction
>>>>>>>> fails.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Tobi
>>>>>>>>
>>>>>>>>


Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath 
wrote:

> Ah nice. Yeah, if user can return full bytes instead of applying a
> function that would result in an exception,  this can be extracted by a
> ParDo down the line.
>

KafkaIO does return bytes, and I think most sources should, unless there is
a good reason not to.
Given that, do we think Beam should provide a tranform that makes to
simpler to handle deadletter output? I think there was a thread about it in
the past.


>
> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia 
> wrote:
>
>> As Raghu said,
>>
>> Just apply a regular ParDo and return a PCollectionTuple afert that you
>> can extract your Success Records (TupleTag) and your DeadLetter
>> records(TupleTag) and do whatever you want with them.
>>
>>
>> Raghu Angadi  schrieb am Mi., 24. Okt. 2018, 05:18:
>>
>>> User can read serialized bytes from KafkaIO and deserialize explicitly
>>> in a ParDo, which gives complete control on how to handle record errors.
>>> This is I would do if I need to in my pipeline.
>>>
>>> If there is a transform in Beam that does this, it could be convenient
>>> for users in many such scenarios. This is simpler than each source
>>> supporting it explicitly.
>>>
>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath 
>>> wrote:
>>>
>>>> Given that KafkaIO uses UnboundeSource framework, this is probably not
>>>> something that can easily be supported. We might be able to support similar
>>>> features when we have Kafka on top of Splittable DoFn though.
>>>>
>>> So feel free to create a feature request JIRA for this.
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles  wrote:
>>>>
>>>>> This is a great question. I've added the dev list to be sure it gets
>>>>> noticed by whoever may know best.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>>>> tobias.kay...@ricardo.ch> wrote:
>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Is there a way to get a Deadletter Output from a pipeline that uses a
>>>>>> KafkaIO
>>>>>> connector for it's input? As
>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>>>>> only a SerializableFunction and not a ParDo, how would I be able to
>>>>>> produce a
>>>>>> Deadletter output from it?
>>>>>>
>>>>>> I have the following pipeline defined that reads from a KafkaIO input:
>>>>>>
>>>>>> pipeline.apply(
>>>>>>   KafkaIO.read()
>>>>>> .withBootstrapServers(bootstrap)
>>>>>> .withTopics(topics)
>>>>>> .withKeyDeserializer(StringDeserializer.class)
>>>>>> .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>> .updateConsumerProperties(
>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>> inputMessagesConfig))
>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>>>>>> "earliest"))
>>>>>> .updateConsumerProperties(ImmutableMap.of("group.id",
>>>>>> "beam-consumers"))
>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
>>>>>> "true"))
>>>>>> .withTimestampPolicyFactory(
>>>>>> TimestampPolicyFactory.withTimestampFn(
>>>>>> new MessageTimestampExtractor(inputMessagesConfig)))
>>>>>> .withReadCommitted()
>>>>>> .commitOffsetsInFinalize())
>>>>>>
>>>>>>
>>>>>> and I like to get deadletter outputs when my timestamp extraction
>>>>>> fails.
>>>>>>
>>>>>> Best,
>>>>>> Tobi
>>>>>>
>>>>>>


Re: KafkaIO - Deadletter output

2018-10-23 Thread Raghu Angadi
User can read serialized bytes from KafkaIO and deserialize explicitly in a
ParDo, which gives complete control on how to handle record errors. This is
I would do if I need to in my pipeline.

If there is a transform in Beam that does this, it could be convenient for
users in many such scenarios. This is simpler than each source supporting
it explicitly.

On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath 
wrote:

> Given that KafkaIO uses UnboundeSource framework, this is probably not
> something that can easily be supported. We might be able to support similar
> features when we have Kafka on top of Splittable DoFn though.
>
So feel free to create a feature request JIRA for this.
>
> Thanks,
> Cham
>
> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles  wrote:
>
>> This is a great question. I've added the dev list to be sure it gets
>> noticed by whoever may know best.
>>
>> Kenn
>>
>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias 
>> wrote:
>>
>>>
>>> Hi,
>>>
>>> Is there a way to get a Deadletter Output from a pipeline that uses a
>>> KafkaIO
>>> connector for it's input? As `TimestampPolicyFactory.withTimestampFn()`
>>> takes
>>> only a SerializableFunction and not a ParDo, how would I be able to
>>> produce a
>>> Deadletter output from it?
>>>
>>> I have the following pipeline defined that reads from a KafkaIO input:
>>>
>>> pipeline.apply(
>>>   KafkaIO.read()
>>> .withBootstrapServers(bootstrap)
>>> .withTopics(topics)
>>> .withKeyDeserializer(StringDeserializer.class)
>>> .withValueDeserializer(ConfigurableDeserializer.class)
>>> .updateConsumerProperties(
>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>> inputMessagesConfig))
>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>>> "earliest"))
>>> .updateConsumerProperties(ImmutableMap.of("group.id",
>>> "beam-consumers"))
>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
>>> "true"))
>>> .withTimestampPolicyFactory(
>>> TimestampPolicyFactory.withTimestampFn(
>>> new MessageTimestampExtractor(inputMessagesConfig)))
>>> .withReadCommitted()
>>> .commitOffsetsInFinalize())
>>>
>>>
>>> and I like to get deadletter outputs when my timestamp extraction fails.
>>>
>>> Best,
>>> Tobi
>>>
>>>


Re: When will the Dataflow Runner invoke the UnboundedReader.close() method and decode the checkpoint?

2018-10-18 Thread Raghu Angadi
Decoding checkpoint is required only while resuming a reader. Typically
this happens: while reopening the reader after it is closed (for any
reason), or while restarting the pipeline with previous checkpoint, as in a
Dataflow update, or when the work moves to a different worker, or if the
worker restarts etc.

Dataflow typically does not close its readers unless it is really required.
Opening a reader is typically expensive. Currently a reader is closed when
the works moves (see above) or the reader is not read from for 1 minute.
Note that the 1 minute inactivity timeout is not same as idle reader
without any any input. Beam keeps polling the reader for more input. A
reader might not polled in some resource constrained pipelines or if lot of
higher priority work gets scheduled (e.g. when a large window fires).

If you want to force a reader to close and resume in your test, throw an
IOException. Dataflow will restart the reader in streaming. Obviously this
is only for testing.

On Thu, Oct 18, 2018 at 9:32 AM flyisland  wrote:

> Hi gurus,
>
> I've added some debug output in my UnboundedReader and Checkpoint classes,
> and noticed that the Dataflow Runner keeps encoding the checkpoint objects,
> but never decode it, and never invoke the UnboundedReader.close() method in
> my testing.
>
> However, the Direct Runner will decode the checkpoint object just after
> encode it,  and it will also invoke the UnboundedReader.close() method and
> start new reader every now and then.
>
> The google Dataflow will be the future production environment, so I'd like
> to know when will the Dataflow Runner invoke the UnboundedReader.close()
> method and decode the checkpoint?
>
> Thanks in advance!
>
> Island Chen
>


Re: Is Splittable DoFn suitable for fetch data from a socket server?

2018-09-21 Thread Raghu Angadi
> This in-house built socket server could accept multiple clients, but only
send messages to the first-connected client, and will send messages to the
second client if the first one disconnected.

Server sending messages to first client connection only is quite critical.
Even if you use Source API which honors 'Setup()' JavaDoc, it is not enough
in your case. Note that is says it reuses, but that does not guarantee
single DoFn instance or when it actually calls TearDown(). It is on
best-effort basis. The work could move to a different worker and the DoFn
instance on earlier worker can live for a long time. So if you held the
connection to server until TearDown() is called, you could be inadvertently
blocking reads from DoFn on the new worker. If you want to keep the
connection open across bundles, you need some way to close an idle
connection asynchronously (alternately your service might have timeout to
close an idle client connection, which is much better). Since you can't
afford to wait till TearDown(), you might as well have a singleton
connection that gets closed after some idle time.

Assuming you need to ack on the same connection that served the records,
finalize() functionality in UnboundedSource API is important case. You can
use UnboundeSource API for now.

On Thu, Sep 20, 2018 at 8:25 PM flyisland  wrote:

> Hi Reuven,
>
> There is no explicit ID in the message itself, and if there is
> information can be used as an ID is depend on use cases.
>
> On Fri, Sep 21, 2018 at 11:05 AM Reuven Lax  wrote:
>
>> Is there information in the message that can be used as an id, that can
>> be used for deduplication?
>>
>> On Thu, Sep 20, 2018 at 6:36 PM flyisland  wrote:
>>
>>> Hi Lukasz,
>>>
>>> With the current API we provided, messages cannot be acked from a
>>> different client.
>>>
>>> The server will re-send messages to the reconnected client if those
>>> messages were not acked. So there'll be duplicate messages, but with a
>>> "redeliver times" property in the header.
>>>
>>> Thanks for your helpful information, I'll check the UnboundedSources,
>>> thanks!
>>>
>>>
>>>
>>> On Fri, Sep 21, 2018 at 2:09 AM Lukasz Cwik  wrote:
>>>
 Are duplicate messages ok?

 Can you ack messages from a different client or are messages sticky to
 a single client (e.g. if one client loses connection, when it reconnects
 can it ack messages it received or are those messages automatically
 replayed)?

 UnboundedSources are the only current "source" type that supports
 finalization callbacks[1] that you will need to ack messages and
 deduplication[2]. SplittableDoFn will support both of these features but
 are not there yet.

 1:
 https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L129
 2:
 https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L93


 On Wed, Sep 19, 2018 at 8:31 PM flyisland  wrote:

> Hi Lukasz,
>
> This socket server is like an MQTT server, it has queues inside it and
> the client should ack each message.
>
> > Is receiving and processing each message reliably important or is it
> ok to drop messages when things fail?
> A: Reliable is important, no messages should be lost.
>
> > Is there a message acknowledgement system or can you request a
> position within the message stream (e.g. send all messages from position X
> when connecting and if for whatever reason you need to reconnect you can
> say send messages from position X to replay past messages)?
> A: Client should ack each message it received, and the server will
> delete the acked message. If the client broked and the server do not
> receive an ack, the server will re-send the message to the client while it
> online again. And there is no "position" concept like kafka.
>
>
> On Thu, Sep 20, 2018 at 4:27 AM Lukasz Cwik  wrote:
>
>> Before getting into what you could use and the current state of
>> SplittableDoFn and its supported features, I was wondering what 
>> reliability
>> guarantees does the socket server have around messages?
>>
>> Is receiving and processing each message reliably important or is it
>> ok to drop messages when things fail?
>> Is there a message acknowledgement system or can you request a
>> position within the message stream (e.g. send all messages from position 
>> X
>> when connecting and if for whatever reason you need to reconnect you can
>> say send messages from position X to replay past messages)?
>>
>>
>>
>>
>> On Tue, Sep 18, 2018 at 5:00 PM flyisland 
>> wrote:
>>
>>>
>>> Hi Gurus,
>>>
>>> I'm trying to create an IO connector to fetch data from a socket
>>> server from 

Re: [ANNOUNCEMENT] New Beam chair: Kenneth Knowles

2018-09-20 Thread Raghu Angadi
Congrats Kenn!

On Wed, Sep 19, 2018 at 12:54 PM Davor Bonaci  wrote:

> Hi everyone --
> It is with great pleasure that I announce that at today's meeting of the
> Foundation's Board of Directors, the Board has appointed Kenneth Knowles as
> the second chair of the Apache Beam project.
>
> Kenn has served on the PMC since its inception, and is very active and
> effective in growing the community. His exemplary posts have been cited in
> other projects. I'm super happy to have Kenn accepted the nomination, and
> I'm confident that he'll serve with distinction.
>
> As for myself, I'm not going anywhere. I'm still around and will be as
> active as I have recently been. Thrilled to be able to pass the baton to
> such a key member of this community and to have less administrative work to
> do ;-).
>
> Please join me in welcoming Kenn to his new role, and I ask that you
> support him as much as possible. As always, please let me know if you have
> any questions.
>
> Davor
>


Re: [VOTE] Release 2.7.0, release candidate #1

2018-09-17 Thread Raghu Angadi
Thanks Charles. Sent cherry-pick for KafkaIO fix:
https://github.com/apache/beam/pull/6421


On Mon, Sep 17, 2018 at 10:18 AM Charles Chen  wrote:

> Luke, Maximillian, Raghu, can you please propose cherry-pick PRs to the
> release-2.7.0 for your issues and add me as a reviewer (@charlesccychen)?
>
> Romain, JB: is there any way I can help with debugging the issue you're
> facing so we can unblock the release?
>
> On Fri, Sep 14, 2018 at 1:49 PM Raghu Angadi  wrote:
>
>> I would like propose one more cherrypick for RC2 :
>> https://github.com/apache/beam/pull/6391
>> This is a KafkaIO bug fix. Once a user hits this bug, there is no easy
>> work around for them, especially on Dataflow. Only work around in Dataflow
>> is to restart or reload the job.
>>
>> The fix itself fairly safe and is tested.
>> Raghu.
>>
>> On Fri, Sep 14, 2018 at 12:52 AM Alexey Romanenko <
>> aromanenko@gmail.com> wrote:
>>
>>> Perhaps it could help, but I run simple WordCount (built with Beam 2.7)
>>> on YARN/Spark (HDP Sandbox) cluster and it worked fine for me.
>>>
>>> On 14 Sep 2018, at 06:56, Romain Manni-Bucau 
>>> wrote:
>>>
>>> Hi Charles,
>>>
>>> I didn't get enough time to check deeply but it is clearly a dependency
>>> issue and it is not in beam spark runner itself but in another transitive
>>> module of beam. It does not happen in existing spark test cause none of
>>> them are in a cluster (even just with 1 worker) but this seems to be a
>>> regression since 2.6 works OOTB.
>>>
>>> Romain Manni-Bucau
>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>> <http://rmannibucau.wordpress.com/> | Github
>>> <https://github.com/rmannibucau> | LinkedIn
>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>
>>>
>>> Le jeu. 13 sept. 2018 à 22:15, Charles Chen  a écrit :
>>>
>>>> Romain and JB, can you please add the results of your investigations
>>>> into the errors you've seen above?  Given that the existing SparkRunner
>>>> tests pass for this RC, and that the integration test you ran is in another
>>>> repo that is not continuously tested with Beam, it is not clear how we
>>>> should move forward and whether this is a blocking issue, unless we can
>>>> find a root cause in Beam.
>>>>
>>>> On Wed, Sep 12, 2018 at 2:08 AM Etienne Chauchot 
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> on a performance and functional regression stand point I see no
>>>>> regression:
>>>>>
>>>>> I looked at nexmark graphs "output pcollection size" and "execution
>>>>> time" around release cut date on dataflow, spark, flink and direct runner
>>>>> in batch and streaming modes. There seems to be no regression.
>>>>>
>>>>> Etienne
>>>>>
>>>>> Le mardi 11 septembre 2018 à 12:25 -0700, Charles Chen a écrit :
>>>>>
>>>>> The SparkRunner validation test (here:
>>>>> https://beam.apache.org/contribute/release-guide/#run-validation-tests)
>>>>> passes on my machine.  It looks like we are likely missing test coverage
>>>>> where Romain is hitting issues.
>>>>>
>>>>> On Tue, Sep 11, 2018 at 12:15 PM Ahmet Altay  wrote:
>>>>>
>>>>> Could anyone else help with looking at these issues earlier?
>>>>>
>>>>> On Tue, Sep 11, 2018 at 12:03 PM, Romain Manni-Bucau <
>>>>> rmannibu...@gmail.com> wrote:
>>>>>
>>>>> Im running this main [1] through this IT [2]. Was working fine since
>>>>> ~1 year but 2.7.0 broke it. Didnt investigate more but can have a look
>>>>> later this month if it helps.
>>>>>
>>>>> [1]
>>>>> https://github.com/Talend/component-runtime/blob/master/component-runtime-beam/src/it/serialization-over-cluster/src/main/java/org/talend/sdk/component/beam/it/clusterserialization/Main.java
>>>>> [2]
>>>>> https://github.com/Talend/component-runtime/blob/master/component-runtime-beam/src/it/serialization-over-cluster/src/test/java/org/talend/sdk/component/beam/it/SerializationOverClusterIT.java
>>>>

Re: [VOTE] Release 2.7.0, release candidate #1

2018-09-14 Thread Raghu Angadi
I would like propose one more cherrypick for RC2 :
https://github.com/apache/beam/pull/6391
This is a KafkaIO bug fix. Once a user hits this bug, there is no easy work
around for them, especially on Dataflow. Only work around in Dataflow is to
restart or reload the job.

The fix itself fairly safe and is tested.
Raghu.

On Fri, Sep 14, 2018 at 12:52 AM Alexey Romanenko 
wrote:

> Perhaps it could help, but I run simple WordCount (built with Beam 2.7) on
> YARN/Spark (HDP Sandbox) cluster and it worked fine for me.
>
> On 14 Sep 2018, at 06:56, Romain Manni-Bucau 
> wrote:
>
> Hi Charles,
>
> I didn't get enough time to check deeply but it is clearly a dependency
> issue and it is not in beam spark runner itself but in another transitive
> module of beam. It does not happen in existing spark test cause none of
> them are in a cluster (even just with 1 worker) but this seems to be a
> regression since 2.6 works OOTB.
>
> Romain Manni-Bucau
> @rmannibucau  |  Blog
>  | Old Blog
>  | Github
>  | LinkedIn
>  | Book
> 
>
>
> Le jeu. 13 sept. 2018 à 22:15, Charles Chen  a écrit :
>
>> Romain and JB, can you please add the results of your investigations into
>> the errors you've seen above?  Given that the existing SparkRunner tests
>> pass for this RC, and that the integration test you ran is in another repo
>> that is not continuously tested with Beam, it is not clear how we should
>> move forward and whether this is a blocking issue, unless we can find a
>> root cause in Beam.
>>
>> On Wed, Sep 12, 2018 at 2:08 AM Etienne Chauchot 
>> wrote:
>>
>>> Hi all,
>>>
>>> on a performance and functional regression stand point I see no
>>> regression:
>>>
>>> I looked at nexmark graphs "output pcollection size" and "execution
>>> time" around release cut date on dataflow, spark, flink and direct runner
>>> in batch and streaming modes. There seems to be no regression.
>>>
>>> Etienne
>>>
>>> Le mardi 11 septembre 2018 à 12:25 -0700, Charles Chen a écrit :
>>>
>>> The SparkRunner validation test (here:
>>> https://beam.apache.org/contribute/release-guide/#run-validation-tests)
>>> passes on my machine.  It looks like we are likely missing test coverage
>>> where Romain is hitting issues.
>>>
>>> On Tue, Sep 11, 2018 at 12:15 PM Ahmet Altay  wrote:
>>>
>>> Could anyone else help with looking at these issues earlier?
>>>
>>> On Tue, Sep 11, 2018 at 12:03 PM, Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:
>>>
>>> Im running this main [1] through this IT [2]. Was working fine since ~1
>>> year but 2.7.0 broke it. Didnt investigate more but can have a look later
>>> this month if it helps.
>>>
>>> [1]
>>> https://github.com/Talend/component-runtime/blob/master/component-runtime-beam/src/it/serialization-over-cluster/src/main/java/org/talend/sdk/component/beam/it/clusterserialization/Main.java
>>> [2]
>>> https://github.com/Talend/component-runtime/blob/master/component-runtime-beam/src/it/serialization-over-cluster/src/test/java/org/talend/sdk/component/beam/it/SerializationOverClusterIT.java
>>>
>>> Le mar. 11 sept. 2018 20:54, Charles Chen  a écrit :
>>>
>>> Romain: can you give more details on the failure you're encountering,
>>> i.e. how you are performing this validation?
>>>
>>> On Tue, Sep 11, 2018 at 9:36 AM Jean-Baptiste Onofré 
>>> wrote:
>>>
>>> Hi,
>>>
>>> weird, I didn't have it on Beam samples. Let me try to reproduce and I
>>> will create the Jira.
>>>
>>> Regards
>>> JB
>>>
>>> On 11/09/2018 11:44, Romain Manni-Bucau wrote:
>>> > -1, seems spark integration is broken (tested with spark 2.3.1 and
>>> 2.2.1):
>>> >
>>> > 18/09/11 11:33:29 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID
>>> 0, RMANNIBUCAU, executor 0): java.lang.ClassCastException: cannot assign
>>> instance of scala.collection.immutable.List$SerializationProxy to
>>> fieldorg.apache.spark.rdd.RDD.org
>>>  <
>>> http://org.apache.spark.rdd.RDD.org
>>> >$apache$spark$rdd$RDD$$dependencies_
>>> of type scala.collection.Seq in instance of
>>> org.apache.spark.rdd.MapPartitionsRDD
>>> >   at
>>> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
>>> >
>>> >
>>> > Also the issue Lukasz identified is important even if workarounds can
>>> be
>>> > put in place so +1 to fix it as well if possible.
>>> >
>>> > Romain Manni-Bucau
>>> > @rmannibucau  | Blog
>>> >  | Old Blog
>>> >  | Github
>>> >  | LinkedIn
>>> >  | Book
>>> > <
>>> https://www.packtpub.com/application-development/java-ee-8-high-performa

Re: Donating the Dataflow Worker code to Apache Beam

2018-09-13 Thread Raghu Angadi
On Thu, Sep 13, 2018 at 12:53 PM Romain Manni-Bucau 
wrote:

> If usable by itself without google karma (can you use a worker without
> dataflow itself?) it sounds awesome otherwise it sounds weird IMHO.
>

Can you elaborate a bit more on using worker without dataflow? I
essentially  see that as o part of Dataflow runner. A runner is specific to
a platform.

I am a Googler, but commenting as a community member.

Raghu.

>
> Le jeu. 13 sept. 2018 21:36, Kai Jiang  a écrit :
>
>> +1 (non googler)
>>
>> big help for transparency and for future runners.
>>
>> Best,
>> Kai
>>
>> On Thu, Sep 13, 2018, 11:45 Xinyu Liu  wrote:
>>
>>> Big +1 (non-googler).
>>>
>>> From Samza Runner's perspective, we are very happy to see dataflow
>>> worker code so we can learn and compete :).
>>>
>>> Thanks,
>>> Xinyu
>>>
>>> On Thu, Sep 13, 2018 at 11:34 AM Suneel Marthi 
>>> wrote:
>>>
>>>> +1 (non-googler)
>>>>
>>>> This is a great 👍 move
>>>>
>>>> Sent from my iPhone
>>>>
>>>> On Sep 13, 2018, at 2:25 PM, Tim Robertson 
>>>> wrote:
>>>>
>>>> +1 (non googler)
>>>> It sounds pragmatic, helps with transparency should issues arise and
>>>> enables more people to fix.
>>>>
>>>>
>>>> On Thu, Sep 13, 2018 at 8:15 PM Dan Halperin 
>>>> wrote:
>>>>
>>>>> From my perspective as a (non-Google) community member, huge +1.
>>>>>
>>>>> I don't see anything bad for the community about open sourcing more of
>>>>> the probably-most-used runner. While the DirectRunner is probably still 
>>>>> the
>>>>> most referential implementation of Beam, can't hurt to see more working
>>>>> code. Other runners or runner implementors can refer to this code if they
>>>>> want, and ignore it if they don't.
>>>>>
>>>>> In terms of having more code and tests to support, well, that's par
>>>>> for the course. Will this change make the things that need to be done to
>>>>> support them more obvious? (E.g., "this PR is blocked because someone at
>>>>> Google on Dataflow team has to fix something" vs "this PR is blocked
>>>>> because the Apache Beam code in foo/bar/baz is failing, and anyone who can
>>>>> see the code can fix it"). The latter seems like a clear win for the
>>>>> community.
>>>>>
>>>>> (As long as the code donation is handled properly, but that's
>>>>> completely orthogonal and I have no reason to think it wouldn't be.)
>>>>>
>>>>> Thanks,
>>>>> Dan
>>>>>
>>>>> On Thu, Sep 13, 2018 at 11:06 AM Lukasz Cwik  wrote:
>>>>>
>>>>>> Yes, I'm specifically asking the community for opinions as to whether
>>>>>> it should be accepted or not.
>>>>>>
>>>>>> On Thu, Sep 13, 2018 at 10:51 AM Raghu Angadi 
>>>>>> wrote:
>>>>>>
>>>>>>> This is terrific!
>>>>>>>
>>>>>>> Is thread asking for opinions from the community about if it should
>>>>>>> be accepted? Assuming Google side decision is made to contribute, big +1
>>>>>>> from me to include it next to other runners.
>>>>>>>
>>>>>>> On Thu, Sep 13, 2018 at 10:38 AM Lukasz Cwik 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> At Google we have been importing the Apache Beam code base and
>>>>>>>> integrating it with the Google portion of the codebase that supports 
>>>>>>>> the
>>>>>>>> Dataflow worker. This process is painful as we regularly are making
>>>>>>>> breaking API changes to support libraries related to running portable
>>>>>>>> pipelines (and sometimes in other places as well). This has made it
>>>>>>>> sometimes difficult for PR changes to make changes without either 
>>>>>>>> breaking
>>>>>>>> something for Google or waiting for a Googler to make the change 
>>>>>>>> internally
>>>>>>>> (e.g. dependency updates).
>>>>>>>>
>>>>>>>> This code is very similar to the other integrations that exist for
>>>>>>>> runners such as Flink/Spark/Apex/Samza. It is an adaption layer that 
>>>>>>>> sits
>>>>>>>> on top of an execution engine. There is no super secret awesome stuff 
>>>>>>>> as
>>>>>>>> this code was already publicly visible in the past when it was part of 
>>>>>>>> the
>>>>>>>> Google Cloud Dataflow github repo[1].
>>>>>>>>
>>>>>>>> Process wise the code will need to get approval from Google to be
>>>>>>>> donated and for it to go through the code donation process but before 
>>>>>>>> we
>>>>>>>> attempt to do that, I was wondering whether the community would object 
>>>>>>>> to
>>>>>>>> adding this code to the master branch?
>>>>>>>>
>>>>>>>> The up side is that people can make breaking changes and fix it for
>>>>>>>> all runners. It will also help Googlers contribute more to the 
>>>>>>>> portability
>>>>>>>> story as it will remove the burden of doing the code import (wasted 
>>>>>>>> time)
>>>>>>>> and it will allow people to develop in master (can have the whole 
>>>>>>>> project
>>>>>>>> loaded in a single IDE).
>>>>>>>>
>>>>>>>> The downsides are that this will represent more code and unit tests
>>>>>>>> to support.
>>>>>>>>
>>>>>>>> 1:
>>>>>>>> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/tree/hotfix_v1.2/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker
>>>>>>>>
>>>>>>>


Re: Donating the Dataflow Worker code to Apache Beam

2018-09-13 Thread Raghu Angadi
This is terrific!

Is thread asking for opinions from the community about if it should be
accepted? Assuming Google side decision is made to contribute, big +1 from
me to include it next to other runners.

On Thu, Sep 13, 2018 at 10:38 AM Lukasz Cwik  wrote:

> At Google we have been importing the Apache Beam code base and integrating
> it with the Google portion of the codebase that supports the Dataflow
> worker. This process is painful as we regularly are making breaking API
> changes to support libraries related to running portable pipelines (and
> sometimes in other places as well). This has made it sometimes difficult
> for PR changes to make changes without either breaking something for Google
> or waiting for a Googler to make the change internally (e.g. dependency
> updates).
>
> This code is very similar to the other integrations that exist for runners
> such as Flink/Spark/Apex/Samza. It is an adaption layer that sits on top of
> an execution engine. There is no super secret awesome stuff as this code
> was already publicly visible in the past when it was part of the Google
> Cloud Dataflow github repo[1].
>
> Process wise the code will need to get approval from Google to be donated
> and for it to go through the code donation process but before we attempt to
> do that, I was wondering whether the community would object to adding this
> code to the master branch?
>
> The up side is that people can make breaking changes and fix it for all
> runners. It will also help Googlers contribute more to the portability
> story as it will remove the burden of doing the code import (wasted time)
> and it will allow people to develop in master (can have the whole project
> loaded in a single IDE).
>
> The downsides are that this will represent more code and unit tests to
> support.
>
> 1:
> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/tree/hotfix_v1.2/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker
>


Re: [DISCUSS] Versioning, Hadoop related dependencies and enterprise users

2018-08-28 Thread Raghu Angadi
Thanks for the IO versioning summary.
KafkaIO's policy of 'let the user decide exact version at runtime' has been
quite useful so far. How feasible is that for other connectors?

Also, KafkaIO does not limit itself to minimum features available across
all the supported versions. Some of the features (e.g. server side
timestamps) are disabled based on runtime Kafka version.  The unit tests
currently run with single recent version. Integration tests could certainly
use multiple versions. With some more effort in writing tests, we could
make multiple versions of the unit tests.

Raghu.

IO versioning
> * Elasticsearch. We delayed the move to version 6 until we heard of
> more active users needing it (more deployments). We support 2.x and
> 5.x (but 2.x went recently EOL). Support for 6.x is in progress.
> * SolrIO, stable version is 7.x, LTS is 6.x. We support only 5.x
> because most big data distributions still use 5.x (however 5.x has
> been EOL).
> * KafkaIO uses version 1.x but Kafka recently moved to 2.x, however
> most of the deployments of Kafka use earlier versions than 1.x. This
> module uses a single version with the kafka client as a provided
> dependency and so far it works (but we don’t have multi version
> tests).
>


On Tue, Aug 28, 2018 at 8:38 AM Ismaël Mejía  wrote:

> I think we should refine the strategy on dependencies discussed
> recently. Sorry to come late with this (I did not follow closely the
> previous discussion), but the current approach is clearly not in line
> with the industry reality (at least not for IO connectors + Hadoop +
> Spark/Flink use).
>
> A really proactive approach to dependency updates is a good practice
> for the core dependencies we have e.g. Guava, Bytebuddy, Avro,
> Protobuf, etc, and of course for the case of cloud based IOs e.g. GCS,
> Bigquery, AWS S3, etc. However when we talk about self hosted data
> sources or processing systems this gets more complicated and I think
> we should be more flexible and do this case by case (and remove these
> from the auto update email reminder).
>
> Some open source projects have at least three maintained versions:
> - LTS – maps to what most of the people have installed (or the big
> data distributions use) e.g. HBase 1.1.x, Hadoop 2.6.x
> - Stable – current recommended version. HBase 1.4.x, Hadoop 2.8.x
> - Next – latest release. HBase 2.1.x Hadoop 3.1.x
>
> Following the most recent versions can be good to be close to the
> current development of other projects and some of the fixes, but these
> versions are commonly not deployed for most users and adopting a LTS
> or stable only approach won't satisfy all cases either. To understand
> why this is complex let’s see some historical issues:
>
> IO versioning
> * Elasticsearch. We delayed the move to version 6 until we heard of
> more active users needing it (more deployments). We support 2.x and
> 5.x (but 2.x went recently EOL). Support for 6.x is in progress.
> * SolrIO, stable version is 7.x, LTS is 6.x. We support only 5.x
> because most big data distributions still use 5.x (however 5.x has
> been EOL).
> * KafkaIO uses version 1.x but Kafka recently moved to 2.x, however
> most of the deployments of Kafka use earlier versions than 1.x. This
> module uses a single version with the kafka client as a provided
> dependency and so far it works (but we don’t have multi version
> tests).
>
> Runners versioning
> * The move to Spark 1 to Spark 2 was decided after evaluating the
> tradeoffs between maintaining multiple version support and to have
> breaking changes with the issues of maintaining multiple versions.
> This is a rare case but also with consequences. This dependency is
> provided but we don't actively test issues on version migration.
> * Flink moved to version 1.5, introducing incompatibility in
> checkpointing (discussed recently and with not yet consensus on how to
> handle).
>
> As you can see, it seems really hard to have a solution that fits all
> cases. Probably the only rule that I see from this list is that we
> should upgrade versions for connectors that have been deprecated or
> arrived to the EOL (e.g. Solr 5.x, Elasticsearch 2.x).
>
> For the case of the provided dependencies I wonder if as part of the
> tests we should provide tests with multiple versions (note that this
> is currently blocked by BEAM-4087).
>
> Any other ideas or opinions to see how we can handle this? What other
> people in the community think ? (Notice that this can have relation
> with the ongoing LTS discussion.
>
>
> On Tue, Aug 28, 2018 at 10:44 AM Tim Robertson
>  wrote:
> >
> > Hi folks,
> >
> > I'd like to revisit the discussion around our versioning policy
> specifically for the Hadoop ecosystem and make sure we are aware of the
> implications.
> >
> > As an example our policy today would have us on HBase 2.1 and I have
> reminders to address this.
> >
> > However, currently the versions of HBase in the major hadoop distros are:
> >
> >  - Cloudera 5 on HBase 1.2 (Cloude

Re: SQS source

2018-07-23 Thread Raghu Angadi
On Mon, Jul 23, 2018 at 2:25 PM John Rudolf Lewis 
wrote:

> So I guess I can add a timestamp to the message attributes when i receive
> it from SQS since there is no such built in property.
> But what triggers finilizeCheckpoint to be called? So far in my testing, I
> never see that method get called, and hence, my messages keep getting
> redelivered.
>

Are you testing with direct runner? It should be called after first stage
processes (i.e. the checkpoint mark is durably committed by the runner).

Raghu.


>
>
> On Thu, Jul 19, 2018 at 5:26 PM, Raghu Angadi  wrote:
>
>> A timestamp for a message is fundamental to an element in a PCollection.
>> What do you mean by not knowing timestamp of a message?
>> There is finalizeCheckpoint API[1] in UnboundedSource. Does that help?
>> PubSub is also very similar, a message need to be acked with in a timeout,
>> otherwise it will be redelivered to one of the consumer. Pubsub messages
>> are acked inside finalize().
>>
>> [1]:
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L129
>>
>>
>> On Thu, Jul 19, 2018 at 3:28 PM John Rudolf Lewis 
>> wrote:
>>
>>> hmm... made lots of progress on this today. But need help understanding
>>> something
>>>
>>> UnboundedSource seems to assume that there is some guarantee of message
>>> ordering, and that you can get the timestamp of the current message. Using
>>> UnboundedSource.CheckpointMark to help advance the offset. Seems to work ok
>>> for any source that supports those assumptions. But SQS does not work this
>>> way.
>>>
>>> With a standard SQS queue, there is no guarantee of ordering and there
>>> is no timestamp for a message.  With SQS, one needs to call the delete api
>>> using the receipt handle from the message to acknowledge receipt of a
>>> message and prevent its redelivery after the visibility timeout has expired.
>>>
>>> I'm not sure how to adapt these two patterns and would welcome
>>> suggestions.
>>>
>>>
>>>
>>> On Thu, Jul 19, 2018 at 7:40 AM, Jean-Baptiste Onofré 
>>> wrote:
>>>
>>>> Thx John !
>>>>
>>>> Regards
>>>> JB
>>>>
>>>> On 19/07/2018 16:39, John Rudolf Lewis wrote:
>>>> > Thank you.
>>>> >
>>>> > I've created a jira ticket to add SQS and have assigned it to
>>>> > myself: https://issues.apache.org/jira/browse/BEAM-4828
>>>> >
>>>> > Modified the documentation to show it as in-progress:
>>>> > https://github.com/apache/beam/pull/5995
>>>> >
>>>> > And will be starting my work
>>>> > here: https://github.com/JohnRudolfLewis/beam/tree/Add-SqsIO
>>>> >
>>>> > On Thu, Jul 19, 2018 at 1:43 AM, Jean-Baptiste Onofré <
>>>> j...@nanthrax.net
>>>> > <mailto:j...@nanthrax.net>> wrote:
>>>> >
>>>> > Agree with Ismaël.
>>>> >
>>>> > I would be more than happy to help on this one (as I contributed
>>>> on AMQP
>>>> > and JMS IOs ;)).
>>>> >
>>>> > Regards
>>>> > JB
>>>> >
>>>> > On 19/07/2018 10:39, Ismaël Mejía wrote:
>>>> > > Thanks for your interest John, it would be a really nice
>>>> contribution
>>>> > > to add SQS support.
>>>> > >
>>>> > > Some context on the kinesis stuff:
>>>> > >
>>>> > > The reason why kinesis is still in a separate module is more
>>>> related
>>>> > > to a licensing problem. Kinesis uses some native libraries that
>>>> are
>>>> > > published under a not 100% apache compatible license and we are
>>>> not
>>>> > > allowed to shade and republish them but it seems there is a
>>>> workaround
>>>> > > now, for more details see
>>>> > > https://issues.apache.org/jira/browse/BEAM-3549
>>>> > <https://issues.apache.org/jira/browse/BEAM-3549>
>>>> > > In any case if to use SQS you only need the Apache licensed
>>>> aws-sdk
>>>> > > deps it is ok (and a good idea) if you put it in the
>>>> > > amazon-web-services module.
>>&g

Re: SQS source

2018-07-19 Thread Raghu Angadi
A timestamp for a message is fundamental to an element in a PCollection.
What do you mean by not knowing timestamp of a message?
There is finalizeCheckpoint API[1] in UnboundedSource. Does that help?
PubSub is also very similar, a message need to be acked with in a timeout,
otherwise it will be redelivered to one of the consumer. Pubsub messages
are acked inside finalize().

[1]:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L129

On Thu, Jul 19, 2018 at 3:28 PM John Rudolf Lewis 
wrote:

> hmm... made lots of progress on this today. But need help understanding
> something
>
> UnboundedSource seems to assume that there is some guarantee of message
> ordering, and that you can get the timestamp of the current message. Using
> UnboundedSource.CheckpointMark to help advance the offset. Seems to work ok
> for any source that supports those assumptions. But SQS does not work this
> way.
>
> With a standard SQS queue, there is no guarantee of ordering and there is
> no timestamp for a message.  With SQS, one needs to call the delete api
> using the receipt handle from the message to acknowledge receipt of a
> message and prevent its redelivery after the visibility timeout has expired.
>
> I'm not sure how to adapt these two patterns and would welcome suggestions.
>
>
>
> On Thu, Jul 19, 2018 at 7:40 AM, Jean-Baptiste Onofré 
> wrote:
>
>> Thx John !
>>
>> Regards
>> JB
>>
>> On 19/07/2018 16:39, John Rudolf Lewis wrote:
>> > Thank you.
>> >
>> > I've created a jira ticket to add SQS and have assigned it to
>> > myself: https://issues.apache.org/jira/browse/BEAM-4828
>> >
>> > Modified the documentation to show it as in-progress:
>> > https://github.com/apache/beam/pull/5995
>> >
>> > And will be starting my work
>> > here: https://github.com/JohnRudolfLewis/beam/tree/Add-SqsIO
>> >
>> > On Thu, Jul 19, 2018 at 1:43 AM, Jean-Baptiste Onofré > > > wrote:
>> >
>> > Agree with Ismaël.
>> >
>> > I would be more than happy to help on this one (as I contributed on
>> AMQP
>> > and JMS IOs ;)).
>> >
>> > Regards
>> > JB
>> >
>> > On 19/07/2018 10:39, Ismaël Mejía wrote:
>> > > Thanks for your interest John, it would be a really nice
>> contribution
>> > > to add SQS support.
>> > >
>> > > Some context on the kinesis stuff:
>> > >
>> > > The reason why kinesis is still in a separate module is more
>> related
>> > > to a licensing problem. Kinesis uses some native libraries that
>> are
>> > > published under a not 100% apache compatible license and we are
>> not
>> > > allowed to shade and republish them but it seems there is a
>> workaround
>> > > now, for more details see
>> > > https://issues.apache.org/jira/browse/BEAM-3549
>> > 
>> > > In any case if to use SQS you only need the Apache licensed
>> aws-sdk
>> > > deps it is ok (and a good idea) if you put it in the
>> > > amazon-web-services module.
>> > >
>> > > The kinesis connector is way more complex for multiple reasons,
>> first,
>> > > the raw version of the amazon client libraries is not so
>> ‘friendly’
>> > > and the guys who created KinesisIO had to do some workarounds to
>> > > provide accurate checkpointing/watermarks. So since SQS is a way
>> > > simpler system you should probably be ok basing it in simpler
>> sources
>> > > like AMQP or JMS.
>> > >
>> > > If you feel like to, please create the JIRA and don’t hesitate to
>> ask
>> > > questions if you find issues or if you need some review.
>> > >
>> > > On Thu, Jul 19, 2018 at 12:55 AM Lukasz Cwik > > > wrote:
>> > >>
>> > >>
>> > >>
>> > >> On Wed, Jul 18, 2018 at 3:30 PM John Rudolf Lewis
>> > mailto:johnrle...@gmail.com>> wrote:
>> > >>>
>> > >>> I need an SQS source for my project that is using beam. A brief
>> > search did not turn up any in-progress work in this area. Please
>> > point me to the right repo if I missed it.
>> > >>
>> > >>
>> > >> To my knowledge there is none and nobody has marked it in
>> > progress on https://beam.apache.org/documentation/io/built-in/
>> > . It would be
>> > good to create a JIRA issue on https://issues.apache.org/ and send
>> a
>> > PR to add SQS to the inprogress list referencing your JIRA. I added
>> > you as a contributor in JIRA so you should be able to assign
>> > yourself to any issues that you create.
>> > >>
>> > >>>
>> > >>> Assuming there is no in-progress effort, I would like to
>> > contribute an Amazon SQS source. I have a few questions before I
>> begin.
>> > >>
>> > >>
>> > >> Great, note that this is a good starting point for authoring an
>> > IO transform:
>> > https://beam.apache.org

Re: An update on Eugene

2018-07-16 Thread Raghu Angadi
You will be missed equally by both the developers and the users.

Thanks for your contributions across the entire Beam code base and  the
community.
All the best in your new projects.

Raghu.

On Mon, Jul 16, 2018 at 12:17 PM Eugene Kirpichov 
wrote:

> Hi beamers,
>
> After 5.5 years working on data processing systems at Google, several of
> these years working on Dataflow and Beam, I am moving on to do something
> new (also at Google) in the area of programming models for machine
> learning. Anybody who worked with me closely knows how much I love building
> programming models, so I could not pass up on the opportunity to build a
> new one - I expect to have a lot of fun there!
>
> On the new team we very much plan to make things open-source when the time
> is right, and make use of Beam, just as TensorFlow does - so I will stay in
> touch with the community, and I expect that we will still work together on
> some things. However, Beam will no longer be the main focus of my work.
>
> I've made the decision a couple months ago and have spent the time since
> then getting things into a good state and handing over the community
> efforts in which I have played a particularly active role - they are in
> very capable hands:
> - Robert Bradshaw and Ankur Goenka on Google side are taking charge of
> Portable Runners (e.g. the Portable Flink runner).
> - Luke Cwik will be in charge of the future of Splittable DoFn. Ismael
> Mejia has also been involved in the effort and actively helping, and I
> believe he continues to do so.
> - The Beam IO ecosystem in general is in very good shape (perhaps the best
> in the industry) and does not need a lot of constant direction; and it has
> a great community (thanks JB, Ismael, Etienne and many others!) - however,
> on Google side, Chamikara Jayalath will take it over.
>
> It was a great pleasure working with you all. My last day formally on Beam
> will be this coming Friday, then I'll take a couple weeks of vacation and
> jump right in on the new team.
>
> Of course, if my involvement in something is necessary, I'm still
> available on all the same channels as always (email, Slack, Hangouts) -
> but, in general, please contact the folks mentioned above instead of me
> about the respective matters from now on.
>
> Thanks!
>


Re: [DISCUSS] Automation for Java code formatting

2018-06-27 Thread Raghu Angadi
On Wed, Jun 27, 2018 at 10:13 AM Kenneth Knowles  wrote:

> Nope! No discretion allowed :-)
>

+1. Fair enough!


>
> On Wed, Jun 27, 2018 at 9:57 AM Raghu Angadi  wrote:
>
>> +1.
>>
>> Wondering if it can be configured to reformat only what we care most
>> about (2 space indentation etc), allowing some discretion on the edges. An
>> example of inconsistent formatting that ends up in my code:
>> ---
>> anObject.someLongMethodName(arg_number_1,
>>arg_number_2);
>> --- vs ---
>> anObject.anotherMethodName(
>>   arg_number_1,
>>   arg_number_2
>> );
>>
>>
>> On Wed, Jun 27, 2018 at 9:41 AM Lukasz Cwik  wrote:
>>
>>> It wasn't clear to me that the intent was to autoformat all the code
>>> from the proposal initially. If thats the case, then the delta is quite
>>> small typically.
>>>
>>> Also, it would be easier if we recommended to users to run run
>>> "./gradlew spotlessApply" which will run spotless on all modules.
>>>
>>> On Wed, Jun 27, 2018 at 9:31 AM Kenneth Knowles  wrote:
>>>
>>>> Luke: the proposal here solves exactly what you are talking about.
>>>>
>>>> The problem you describe happens when the PR author uses autoformat but
>>>> the baseline is not already autoformatted. What I am proposing is to make
>>>> sure the baseline is already autoformatted, so PRs never have extraneous
>>>> formatting changes.
>>>>
>>>> Rafael: the default setting on GitHub is "allow edits by maintainers"
>>>> so actually a committer can run spotless on behalf of a contributor and
>>>> push the fixup. I have done this. It also lets a committer fix up a
>>>> good PR and merge it even if the contributor is, say, asleep.
>>>>
>>>> Kenn
>>>>
>>>> On Wed, Jun 27, 2018 at 9:24 AM Rafael Fernandez 
>>>> wrote:
>>>>
>>>>> Luke: Anything that helps contributors and reviewers work better
>>>>> together - +1! :D
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Jun 27, 2018 at 9:04 AM Lukasz Cwik  wrote:
>>>>>
>>>>>> If spotless is run against a PR that is already well formatted its a
>>>>>> non-issue as the formatting changes are usually related to the change 
>>>>>> but I
>>>>>> have reviewed a few PRs that have 100s of lines of formatting change 
>>>>>> which
>>>>>> really obfuscates the work.
>>>>>> Instead of asking contributors to run spotless, can we have a cron
>>>>>> job run it across the project like once a day/week/... and cut a PR?
>>>>>>
>>>>>> On Wed, Jun 27, 2018 at 8:07 AM Kenneth Knowles 
>>>>>> wrote:
>>>>>>
>>>>>>> Good points, Dan. Checkstyle will still run, but just focused on the
>>>>>>> things that go beyond format.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Wed, Jun 27, 2018 at 8:03 AM Etienne Chauchot <
>>>>>>> echauc...@apache.org> wrote:
>>>>>>>
>>>>>>>> +1 !
>>>>>>>> It's my custom to avoid reformatting to spare meaningless diff
>>>>>>>> burden to the reviewer. Now it will be over, thanks.
>>>>>>>>
>>>>>>>> Etienne
>>>>>>>>
>>>>>>>> Le mardi 26 juin 2018 à 21:15 -0700, Kenneth Knowles a écrit :
>>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> I like readable code, but I don't like formatting it myself. And I
>>>>>>>> _really_ don't like discussing in code review. "Spotless" [1] can 
>>>>>>>> enforce -
>>>>>>>> and automatically apply - automatic formatting for Java, Groovy, and 
>>>>>>>> some
>>>>>>>> others.
>>>>>>>>
>>>>>>>> This is not about style or wanting a particular layout. This is
>>>>>>>> about automation, contributor experience, and streamlining review
>>>>>>>>
>>>>>>>>  - Contributor experience: MUCH better than checkstyle: error
>>

Re: [DISCUSS] Automation for Java code formatting

2018-06-27 Thread Raghu Angadi
+1.

Wondering if it can be configured to reformat only what we care most about
(2 space indentation etc), allowing some discretion on the edges. An
example of inconsistent formatting that ends up in my code:
---
anObject.someLongMethodName(arg_number_1,
   arg_number_2);
--- vs ---
anObject.anotherMethodName(
  arg_number_1,
  arg_number_2
);


On Wed, Jun 27, 2018 at 9:41 AM Lukasz Cwik  wrote:

> It wasn't clear to me that the intent was to autoformat all the code from
> the proposal initially. If thats the case, then the delta is quite small
> typically.
>
> Also, it would be easier if we recommended to users to run run "./gradlew 
> spotlessApply"
> which will run spotless on all modules.
>
> On Wed, Jun 27, 2018 at 9:31 AM Kenneth Knowles  wrote:
>
>> Luke: the proposal here solves exactly what you are talking about.
>>
>> The problem you describe happens when the PR author uses autoformat but
>> the baseline is not already autoformatted. What I am proposing is to make
>> sure the baseline is already autoformatted, so PRs never have extraneous
>> formatting changes.
>>
>> Rafael: the default setting on GitHub is "allow edits by maintainers" so
>> actually a committer can run spotless on behalf of a contributor and push
>> the fixup. I have done this. It also lets a committer fix up a good PR
>> and merge it even if the contributor is, say, asleep.
>>
>> Kenn
>>
>> On Wed, Jun 27, 2018 at 9:24 AM Rafael Fernandez 
>> wrote:
>>
>>> Luke: Anything that helps contributors and reviewers work better
>>> together - +1! :D
>>>
>>>
>>>
>>> On Wed, Jun 27, 2018 at 9:04 AM Lukasz Cwik  wrote:
>>>
 If spotless is run against a PR that is already well formatted its a
 non-issue as the formatting changes are usually related to the change but I
 have reviewed a few PRs that have 100s of lines of formatting change which
 really obfuscates the work.
 Instead of asking contributors to run spotless, can we have a cron job
 run it across the project like once a day/week/... and cut a PR?

 On Wed, Jun 27, 2018 at 8:07 AM Kenneth Knowles  wrote:

> Good points, Dan. Checkstyle will still run, but just focused on the
> things that go beyond format.
>
> Kenn
>
> On Wed, Jun 27, 2018 at 8:03 AM Etienne Chauchot 
> wrote:
>
>> +1 !
>> It's my custom to avoid reformatting to spare meaningless diff burden
>> to the reviewer. Now it will be over, thanks.
>>
>> Etienne
>>
>> Le mardi 26 juin 2018 à 21:15 -0700, Kenneth Knowles a écrit :
>>
>> Hi all,
>>
>> I like readable code, but I don't like formatting it myself. And I
>> _really_ don't like discussing in code review. "Spotless" [1] can 
>> enforce -
>> and automatically apply - automatic formatting for Java, Groovy, and some
>> others.
>>
>> This is not about style or wanting a particular layout. This is about
>> automation, contributor experience, and streamlining review
>>
>>  - Contributor experience: MUCH better than checkstyle: error message
>> just says "run ./gradlew :beam-your-module:spotlessApply" instead of
>> telling them to go in and manually edit.
>>
>>  - Automation: You want to use autoformat so you don't have to format
>> code by hand. But if you autoformat a file that was in some other format,
>> then you touch a bunch of unrelated lines. If the file is already
>> autoformatted, it is much better.
>>
>>  - Review: Never talk about code formatting ever again. A PR also
>> needs baseline to already be autoformatted or formatting will make it
>> unclear which lines are really changed.
>>
>> This is already available via applyJavaNature(enableSpotless: true)
>> and it is turned on for SQL and our buildSrc gradle plugins. It is very
>> nice. There is a JIRA [2] to turn it on for the hold code base. 
>> Personally,
>> I think (a) every module could make a different choice if the main
>> contributors feel strongly and (b) it is objectively better to always
>> autoformat :-)
>>
>> WDYT? If we do it, it is trivial to add it module-at-a-time or
>> globally. If someone conflicts with a massive autoformat commit, they can
>> just keep their changes and autoformat them and it is done.
>>
>> Kenn
>>
>> [1] https://github.com/diffplug/spotless/tree/master/plugin-gradle
>> [2] https://issues.apache.org/jira/browse/BEAM-4394
>>
>>


Re: [FYI] New Apache Beam Swag Store!

2018-06-08 Thread Raghu Angadi
Woo-hoo! This is terrific.

If we are increasing color choices I would like black or charcoal... Beam
logo would really pop on a dark background.

On Fri, Jun 8, 2018 at 3:32 PM Griselda Cuevas  wrote:

> Hi Beam Community,
>
> I just want to share with you the exciting news about our brand new Apache
> Beam Swag Store!
>
> You can find it here: https://store-beam.myshopify.com/
>
> *How does it work?*
>
>- You can just select the items you want and check-out. Our Vendor
>ships to anywhere in the world and normally can have swag to be delivered
>within 1 week. Each company or user will need to pay for their own swag.
>- If you are hosting an event or representing Beam at one, reach out
>to me or the beam-events-meetups slack channel, I'll be happy to review
>your event and see if we can sponsor the swag. We'll have codes for this
>occasions thanks to Google, who has sponsored an initial inventory.
>
> If you have feedback, ideas on new swag, questions or suggestions, reach
> out to me and/or Matthias Baetens.
>
> Happy Friday!
> G
>
>
>


Re: [VOTE] Code Review Process

2018-06-04 Thread Raghu Angadi
+1

On Fri, Jun 1, 2018 at 10:25 AM Thomas Groh  wrote:

> As we seem to largely have consensus in "Reducing Committer Load for Code
> Reviews"[1], this is a vote to change the Beam policy on Code Reviews to
> require that
>
> (1) At least one committer is involved with the code review, as either a
> reviewer or as the author
> (2) A contributor has approved the change
>
> prior to merging any change.
>
> This changes our policy from its current requirement that at least one
> committer *who is not the author* has approved the change prior to merging.
> We believe that changing this process will improve code review throughput,
> reduce committer load, and engage more of the community in the code review
> process.
>
> Please vote:
> [ ] +1: Accept the above proposal to change the Beam code review/merge
> policy
> [ ] -1: Leave the Code Review policy unchanged
>
> Thanks,
>
> Thomas
>
> [1]
> https://lists.apache.org/thread.html/7c1fde3884fbefacc252b6d4b434f9a9c2cf024f381654aa3e47df18@%3Cdev.beam.apache.org%3E
>


Re: What is the future of Reshuffle?

2018-05-21 Thread Raghu Angadi
Filed https://issues.apache.org/jira/browse/BEAM-4372 (unassigned).

On Mon, May 21, 2018 at 10:22 AM Raghu Angadi  wrote:

>
>
> On Mon, May 21, 2018 at 9:56 AM Robert Bradshaw 
> wrote:
>
>> We should probably keep the warning and all the caveats until we
>> introduce the alternative (and migrate to it for the non-parallelism uses
>> of reshuffle). I was just proposing we do this via a separate transform
>> that just calls Reshuffle until we have the new story fully fleshed out (I
>> don't know if any runner supports RequresStableInput, and it isn't
>> translated in the Fn API) to avoid being in this intermediate state for
>> yet another year.
>>
>
> I think all warnings and caveats really belong in JavaDoc for GroupByKey,
> since that is the actual transform the the semantics that we are concerned
> about incorrect use. A reshuffle transform can refer the users there.
>
> In addition, I think Reshuffle is really good name for reshuffle transform
> meant for most users.
>
> That said, any reorganization is much better than deprecation.
>
> Raghu.
>
>
>>
>> On Sun, May 20, 2018 at 6:38 PM Raghu Angadi  wrote:
>>
>>>
>>>
>>> On Sat, May 19, 2018 at 10:55 PM Robert Bradshaw 
>>> wrote:
>>>
>>>> On Sat, May 19, 2018 at 6:27 PM Raghu Angadi 
>>>> wrote:
>>>>
>>>>> [...]
>>>>>
>>>> I think it would be much more user friendly to un-deprecate it to add a
>>>>> warning for advanced users about non-portability of durability/replay
>>>>> guarantees/stable input assumptions.
>>>>>
>>>>>>
>>>> Yes, I think everyone in this thread is in agreement here. We should
>>>> provide a *different* transform that provides the durability guarantees
>>>> (with caveats). In the meantime, this delegating to a reshuffle would be
>>>> better than using a reshuffle directly.
>>>>
>>>
>>> Great. Sent a PR to undeprecate Reshuffle :
>>> https://github.com/apache/beam/pull/5432
>>> The wording there for JavaDoc just a proposal...
>>>
>>> Raghu.
>>>
>>>
>>>> We tend to put in reshuffles in order to "commit" these random values
>>>>>>>>>> and make them stable for the next stage, to be used to provide the 
>>>>>>>>>> needed
>>>>>>>>>> idempotency for sinks.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> In such cases, I think the author should error out on the runner
>>>>>>>>> that don't provide that guarantee. That is what ExactlyOnceSink in 
>>>>>>>>> KafkaIO
>>>>>>>>> does [1].
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1049
>>>>>>>>>
>>>>>>>>
>>>>>>>> We're moving to a world where the runner may not be known at
>>>>>>>> pipeline construction time. However, explicitly using a (distinct)
>>>>>>>> make-input-stable transform when that's the intent (which could be a
>>>>>>>> primitive that runners should implement, possibly by swapping in 
>>>>>>>> Reshuffle,
>>>>>>>> or reject) would allow for this. That being said, the exact semantics 
>>>>>>>> of
>>>>>>>> this transform is a bit of a rabbit hole which is why we never 
>>>>>>>> finished the
>>>>>>>> job of deprecating Reshuffle. This is a case where doing something is
>>>>>>>> better than doing nothing, and our use of URNs for this kind of thing 
>>>>>>>> is
>>>>>>>> flexible enough that we can deprecate old ones if/when we have time to
>>>>>>>> pound out the right solution.
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>>> Kenn
>>>>>>>>>>
>>>>>>>>>> On Fri, May 18, 2018 at 4:05 PM Raghu Angadi 
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, May 18, 2018 at 12:21 PM Robert Bradshaw <
>>>>>>>>>>> rober...@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> On Fri, May 18, 2018 at 11:46 AM Raghu Angadi <
>>>>>>>>>>>> rang...@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks Kenn.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, May 18, 2018 at 11:02 AM Kenneth Knowles <
>>>>>>>>>>>>> k...@google.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> The fact that its usage has grown probably indicates that we
>>>>>>>>>>>>>> have a large number of transforms that can easily cause data 
>>>>>>>>>>>>>> loss /
>>>>>>>>>>>>>> duplication.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Is this specific to Reshuffle or it is true for any
>>>>>>>>>>>>> GroupByKey? I see Reshuffle as just a wrapper around GBK.
>>>>>>>>>>>>>
>>>>>>>>>>>> The issue is when it's used in such a way that data corruption
>>>>>>>>>>>> can occur when the underlying GBK output is not stable.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Could you describe this breakage bit more in detail or give a
>>>>>>>>>>> example? Apologies in advance, I know this came up in multiple 
>>>>>>>>>>> contexts in
>>>>>>>>>>> the past, but I haven't grokked the issue well. It is the window 
>>>>>>>>>>> rewrite
>>>>>>>>>>> that Reshuffle does that causes misuse of GBK?
>>>>>>>>>>>
>>>>>>>>>>> Thanks.
>>>>>>>>>>>
>>>>>>>>>>


Re: What is the future of Reshuffle?

2018-05-21 Thread Raghu Angadi
On Mon, May 21, 2018 at 9:56 AM Robert Bradshaw  wrote:

> We should probably keep the warning and all the caveats until we introduce
> the alternative (and migrate to it for the non-parallelism uses of
> reshuffle). I was just proposing we do this via a separate transform that
> just calls Reshuffle until we have the new story fully fleshed out (I don't
> know if any runner supports RequresStableInput, and it isn't translated
> in the Fn API) to avoid being in this intermediate state for yet another
> year.
>

I think all warnings and caveats really belong in JavaDoc for GroupByKey,
since that is the actual transform the the semantics that we are concerned
about incorrect use. A reshuffle transform can refer the users there.

In addition, I think Reshuffle is really good name for reshuffle transform
meant for most users.

That said, any reorganization is much better than deprecation.

Raghu.


>
> On Sun, May 20, 2018 at 6:38 PM Raghu Angadi  wrote:
>
>>
>>
>> On Sat, May 19, 2018 at 10:55 PM Robert Bradshaw 
>> wrote:
>>
>>> On Sat, May 19, 2018 at 6:27 PM Raghu Angadi  wrote:
>>>
>>>> [...]
>>>>
>>> I think it would be much more user friendly to un-deprecate it to add a
>>>> warning for advanced users about non-portability of durability/replay
>>>> guarantees/stable input assumptions.
>>>>
>>>>>
>>> Yes, I think everyone in this thread is in agreement here. We should
>>> provide a *different* transform that provides the durability guarantees
>>> (with caveats). In the meantime, this delegating to a reshuffle would be
>>> better than using a reshuffle directly.
>>>
>>
>> Great. Sent a PR to undeprecate Reshuffle :
>> https://github.com/apache/beam/pull/5432
>> The wording there for JavaDoc just a proposal...
>>
>> Raghu.
>>
>>
>>> We tend to put in reshuffles in order to "commit" these random values
>>>>>>>>> and make them stable for the next stage, to be used to provide the 
>>>>>>>>> needed
>>>>>>>>> idempotency for sinks.
>>>>>>>>>
>>>>>>>>
>>>>>>>> In such cases, I think the author should error out on the runner
>>>>>>>> that don't provide that guarantee. That is what ExactlyOnceSink in 
>>>>>>>> KafkaIO
>>>>>>>> does [1].
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1049
>>>>>>>>
>>>>>>>
>>>>>>> We're moving to a world where the runner may not be known at
>>>>>>> pipeline construction time. However, explicitly using a (distinct)
>>>>>>> make-input-stable transform when that's the intent (which could be a
>>>>>>> primitive that runners should implement, possibly by swapping in 
>>>>>>> Reshuffle,
>>>>>>> or reject) would allow for this. That being said, the exact semantics of
>>>>>>> this transform is a bit of a rabbit hole which is why we never finished 
>>>>>>> the
>>>>>>> job of deprecating Reshuffle. This is a case where doing something is
>>>>>>> better than doing nothing, and our use of URNs for this kind of thing is
>>>>>>> flexible enough that we can deprecate old ones if/when we have time to
>>>>>>> pound out the right solution.
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>> On Fri, May 18, 2018 at 4:05 PM Raghu Angadi 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, May 18, 2018 at 12:21 PM Robert Bradshaw <
>>>>>>>>>> rober...@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> On Fri, May 18, 2018 at 11:46 AM Raghu Angadi <
>>>>>>>>>>> rang...@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks Kenn.
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, May 18, 2018 at 11:02 AM Kenneth Knowles <
>>>>>>>>>>>> k...@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> The fact that its usage has grown probably indicates that we
>>>>>>>>>>>>> have a large number of transforms that can easily cause data loss 
>>>>>>>>>>>>> /
>>>>>>>>>>>>> duplication.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Is this specific to Reshuffle or it is true for any GroupByKey?
>>>>>>>>>>>> I see Reshuffle as just a wrapper around GBK.
>>>>>>>>>>>>
>>>>>>>>>>> The issue is when it's used in such a way that data corruption
>>>>>>>>>>> can occur when the underlying GBK output is not stable.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Could you describe this breakage bit more in detail or give a
>>>>>>>>>> example? Apologies in advance, I know this came up in multiple 
>>>>>>>>>> contexts in
>>>>>>>>>> the past, but I haven't grokked the issue well. It is the window 
>>>>>>>>>> rewrite
>>>>>>>>>> that Reshuffle does that causes misuse of GBK?
>>>>>>>>>>
>>>>>>>>>> Thanks.
>>>>>>>>>>
>>>>>>>>>


Re: What is the future of Reshuffle?

2018-05-20 Thread Raghu Angadi
On Sat, May 19, 2018 at 10:55 PM Robert Bradshaw 
wrote:

> On Sat, May 19, 2018 at 6:27 PM Raghu Angadi  wrote:
>
>> [...]
>>
> I think it would be much more user friendly to un-deprecate it to add a
>> warning for advanced users about non-portability of durability/replay
>> guarantees/stable input assumptions.
>>
>>>
> Yes, I think everyone in this thread is in agreement here. We should
> provide a *different* transform that provides the durability guarantees
> (with caveats). In the meantime, this delegating to a reshuffle would be
> better than using a reshuffle directly.
>

Great. Sent a PR to undeprecate Reshuffle :
https://github.com/apache/beam/pull/5432
The wording there for JavaDoc just a proposal...

Raghu.


> We tend to put in reshuffles in order to "commit" these random values and
>>>>>>> make them stable for the next stage, to be used to provide the needed
>>>>>>> idempotency for sinks.
>>>>>>>
>>>>>>
>>>>>> In such cases, I think the author should error out on the runner that
>>>>>> don't provide that guarantee. That is what ExactlyOnceSink in KafkaIO 
>>>>>> does
>>>>>> [1].
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1049
>>>>>>
>>>>>
>>>>> We're moving to a world where the runner may not be known at pipeline
>>>>> construction time. However, explicitly using a (distinct) 
>>>>> make-input-stable
>>>>> transform when that's the intent (which could be a primitive that runners
>>>>> should implement, possibly by swapping in Reshuffle, or reject) would 
>>>>> allow
>>>>> for this. That being said, the exact semantics of this transform is a bit
>>>>> of a rabbit hole which is why we never finished the job of deprecating
>>>>> Reshuffle. This is a case where doing something is better than doing
>>>>> nothing, and our use of URNs for this kind of thing is flexible enough 
>>>>> that
>>>>> we can deprecate old ones if/when we have time to pound out the right
>>>>> solution.
>>>>>
>>>>>
>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Fri, May 18, 2018 at 4:05 PM Raghu Angadi 
>>>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, May 18, 2018 at 12:21 PM Robert Bradshaw <
>>>>>>>> rober...@google.com> wrote:
>>>>>>>>
>>>>>>>>> On Fri, May 18, 2018 at 11:46 AM Raghu Angadi 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks Kenn.
>>>>>>>>>>
>>>>>>>>>> On Fri, May 18, 2018 at 11:02 AM Kenneth Knowles 
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> The fact that its usage has grown probably indicates that we
>>>>>>>>>>> have a large number of transforms that can easily cause data loss /
>>>>>>>>>>> duplication.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Is this specific to Reshuffle or it is true for any GroupByKey? I
>>>>>>>>>> see Reshuffle as just a wrapper around GBK.
>>>>>>>>>>
>>>>>>>>> The issue is when it's used in such a way that data corruption can
>>>>>>>>> occur when the underlying GBK output is not stable.
>>>>>>>>>
>>>>>>>>
>>>>>>>> Could you describe this breakage bit more in detail or give a
>>>>>>>> example? Apologies in advance, I know this came up in multiple 
>>>>>>>> contexts in
>>>>>>>> the past, but I haven't grokked the issue well. It is the window 
>>>>>>>> rewrite
>>>>>>>> that Reshuffle does that causes misuse of GBK?
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>>
>>>>>>>


Re: What is the future of Reshuffle?

2018-05-19 Thread Raghu Angadi
On Sat, May 19, 2018 at 8:11 AM Robert Bradshaw  wrote:

> On Fri, May 18, 2018 at 6:29 PM Raghu Angadi  wrote:
>
>> True. I am still failing to see what is broken about Reshuffle that is
>> also not broken with GroupByKey transform. If someone depends on GroupByKey
>> to get stable input, isn't that equally incorrect/unportable?
>>
>
> Yes, if people use GBK in that way, it's also just as broken. The thought
> is that fewer people would use it with that intent, as GBK is not a no-op
> (it transforms the shape of the data, and also does not preserve
> windowing). This is in contrast to Reshuffle which was encouraged for this
> usecase.
>

I see. I am not aware of any recommendation for users (excluding advanced
users) to use this for stable input/durability gaurantees. Every single
case where I recommended Reshuffle was related to parallelism (there were
many such cases). Most of use of Reshuflle/GBK for stable input were done
consciously by the authors, fully aware of the caveats (SDF in Dataflow,
Kafka EOS sink use of GBK, etc).

As a result, deprecation is only hurting the innocent users who are using
Reshuffle correctly.

I think it would be much more user friendly to un-deprecate it to add a
warning for advanced users about non-portability of durability/replay
guarantees/stable input assumptions.

Raghu.

We tend to put in reshuffles in order to "commit" these random values and
>>>>> make them stable for the next stage, to be used to provide the needed
>>>>> idempotency for sinks.
>>>>>
>>>>
>>>> In such cases, I think the author should error out on the runner that
>>>> don't provide that guarantee. That is what ExactlyOnceSink in KafkaIO does
>>>> [1].
>>>>
>>>> [1]
>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1049
>>>>
>>>
>>> We're moving to a world where the runner may not be known at pipeline
>>> construction time. However, explicitly using a (distinct) make-input-stable
>>> transform when that's the intent (which could be a primitive that runners
>>> should implement, possibly by swapping in Reshuffle, or reject) would allow
>>> for this. That being said, the exact semantics of this transform is a bit
>>> of a rabbit hole which is why we never finished the job of deprecating
>>> Reshuffle. This is a case where doing something is better than doing
>>> nothing, and our use of URNs for this kind of thing is flexible enough that
>>> we can deprecate old ones if/when we have time to pound out the right
>>> solution.
>>>
>>>
>>>>
>>>>> Kenn
>>>>>
>>>>> On Fri, May 18, 2018 at 4:05 PM Raghu Angadi 
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> On Fri, May 18, 2018 at 12:21 PM Robert Bradshaw 
>>>>>> wrote:
>>>>>>
>>>>>>> On Fri, May 18, 2018 at 11:46 AM Raghu Angadi 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Kenn.
>>>>>>>>
>>>>>>>> On Fri, May 18, 2018 at 11:02 AM Kenneth Knowles 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> The fact that its usage has grown probably indicates that we have
>>>>>>>>> a large number of transforms that can easily cause data loss / 
>>>>>>>>> duplication.
>>>>>>>>>
>>>>>>>>
>>>>>>>> Is this specific to Reshuffle or it is true for any GroupByKey? I
>>>>>>>> see Reshuffle as just a wrapper around GBK.
>>>>>>>>
>>>>>>> The issue is when it's used in such a way that data corruption can
>>>>>>> occur when the underlying GBK output is not stable.
>>>>>>>
>>>>>>
>>>>>> Could you describe this breakage bit more in detail or give a
>>>>>> example? Apologies in advance, I know this came up in multiple contexts 
>>>>>> in
>>>>>> the past, but I haven't grokked the issue well. It is the window rewrite
>>>>>> that Reshuffle does that causes misuse of GBK?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>


Re: What is the future of Reshuffle?

2018-05-18 Thread Raghu Angadi
On Fri, May 18, 2018 at 5:34 PM Robert Bradshaw  wrote:

> Ah, thanks, that makes sense. That implies to me Reshuffle is no more
>> broken than GBK itself. May be Reshuffle.viaRandomKey() could have a clear
>> caveat. Reshuffle's JavaDoc could add a caveat too about non-deterministic
>> keys and retries (though it applies to GroupByKey in general).
>>
>
> The "randomness" of Reshuffle.viaRandomKey() is fine, as the randomly
> generated key is never exposed to the users (so it doesn't matter if it
> changes).
>

Agreed.


> Reshuffle is broken if you are using it to get stable input on a runner
> that doesn't always have stable input as an implementation detail of GBKs.
>

True. I am still failing to see what is broken about Reshuffle that is also
not broken with GroupByKey transform. If someone depends on GroupByKey to
get stable input, isn't that equally incorrect/unportable?

Raghu.

>
> We tend to put in reshuffles in order to "commit" these random values and
>>> make them stable for the next stage, to be used to provide the needed
>>> idempotency for sinks.
>>>
>>
>> In such cases, I think the author should error out on the runner that
>> don't provide that guarantee. That is what ExactlyOnceSink in KafkaIO does
>> [1].
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1049
>>
>
> We're moving to a world where the runner may not be known at pipeline
> construction time. However, explicitly using a (distinct) make-input-stable
> transform when that's the intent (which could be a primitive that runners
> should implement, possibly by swapping in Reshuffle, or reject) would allow
> for this. That being said, the exact semantics of this transform is a bit
> of a rabbit hole which is why we never finished the job of deprecating
> Reshuffle. This is a case where doing something is better than doing
> nothing, and our use of URNs for this kind of thing is flexible enough that
> we can deprecate old ones if/when we have time to pound out the right
> solution.
>
>
>>
>>> Kenn
>>>
>>> On Fri, May 18, 2018 at 4:05 PM Raghu Angadi  wrote:
>>>
>>>>
>>>> On Fri, May 18, 2018 at 12:21 PM Robert Bradshaw 
>>>> wrote:
>>>>
>>>>> On Fri, May 18, 2018 at 11:46 AM Raghu Angadi 
>>>>> wrote:
>>>>>
>>>>>> Thanks Kenn.
>>>>>>
>>>>>> On Fri, May 18, 2018 at 11:02 AM Kenneth Knowles 
>>>>>> wrote:
>>>>>>
>>>>>>> The fact that its usage has grown probably indicates that we have a
>>>>>>> large number of transforms that can easily cause data loss / 
>>>>>>> duplication.
>>>>>>>
>>>>>>
>>>>>> Is this specific to Reshuffle or it is true for any GroupByKey? I see
>>>>>> Reshuffle as just a wrapper around GBK.
>>>>>>
>>>>> The issue is when it's used in such a way that data corruption can
>>>>> occur when the underlying GBK output is not stable.
>>>>>
>>>>
>>>> Could you describe this breakage bit more in detail or give a example?
>>>> Apologies in advance, I know this came up in multiple contexts in the past,
>>>> but I haven't grokked the issue well. It is the window rewrite that
>>>> Reshuffle does that causes misuse of GBK?
>>>>
>>>> Thanks.
>>>>
>>>


Re: What is the future of Reshuffle?

2018-05-18 Thread Raghu Angadi
On Fri, May 18, 2018 at 4:07 PM Kenneth Knowles  wrote:

> It isn't any particular logic in Reshuffle - it is, semantically, an
> identity transform. It is the fact that other runners are perfectly able to
> re-run transform prior to a GBK. So, for example, randomly generated IDs
> will be re-generated.
>

Ah, thanks, that makes sense. That implies to me Reshuffle is no more
broken than GBK itself. May be Reshuffle.viaRandomKey() could have a clear
caveat. Reshuffle's JavaDoc could add a caveat too about non-deterministic
keys and retries (though it applies to GroupByKey in general).

We tend to put in reshuffles in order to "commit" these random values and
> make them stable for the next stage, to be used to provide the needed
> idempotency for sinks.
>

In such cases, I think the author should error out on the runner that don't
provide that guarantee. That is what ExactlyOnceSink in KafkaIO does [1].

[1]
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1049


> Kenn
>
> On Fri, May 18, 2018 at 4:05 PM Raghu Angadi  wrote:
>
>>
>> On Fri, May 18, 2018 at 12:21 PM Robert Bradshaw 
>> wrote:
>>
>>> On Fri, May 18, 2018 at 11:46 AM Raghu Angadi 
>>> wrote:
>>>
>>>> Thanks Kenn.
>>>>
>>>> On Fri, May 18, 2018 at 11:02 AM Kenneth Knowles 
>>>> wrote:
>>>>
>>>>> The fact that its usage has grown probably indicates that we have a
>>>>> large number of transforms that can easily cause data loss / duplication.
>>>>>
>>>>
>>>> Is this specific to Reshuffle or it is true for any GroupByKey? I see
>>>> Reshuffle as just a wrapper around GBK.
>>>>
>>> The issue is when it's used in such a way that data corruption can occur
>>> when the underlying GBK output is not stable.
>>>
>>
>> Could you describe this breakage bit more in detail or give a example?
>> Apologies in advance, I know this came up in multiple contexts in the past,
>> but I haven't grokked the issue well. It is the window rewrite that
>> Reshuffle does that causes misuse of GBK?
>>
>> Thanks.
>>
>


Re: What is the future of Reshuffle?

2018-05-18 Thread Raghu Angadi
On Fri, May 18, 2018 at 12:22 PM Robert Bradshaw 
wrote:

> [resending]
>
Agreed that keeping this deprecated without a clear replacement for so long
> is not ideal.
>
> I would at least break this into two separate transforms, the
> parallelism-breaking one (which seems OK) and the stable input one (which
> may just call the parallelism-breaking one, but should be decorated with
> lots of caveats and maybe even still have the deprecated annotation).
>

+1. Parallelism-breaking one is the most relevant to many users. Would love
to see that part deprecated, ideally keeping the name Reshuffle.

Raghu.


>
>
> On Fri, May 18, 2018 at 11:02 AM Kenneth Knowles  wrote:
>
>> The fact that its usage has grown probably indicates that we have a large
>> number of transforms that can easily cause data loss / duplication.
>>
>> Yes, it is deprecated because it is primarily used as a Dataflow-specific
>> way to ensure stable input. My understanding is that the SparkRunner also
>> materializes at every GBK so it works there too (is this still the case?).
>> It doesn't work at all for other runners AFAIK. So it is @Deprecated not
>> because there is a replacement, but because it is kind of dangerous to use. 
>> Beam
>> could just say "GBK must ensure stable output" and "a composite containing
>> a GBK has to ensure stable output even if replaced" and that would solve
>> the issue, but I think it would make Beam on Flink impossibly slow - I
>> could be wrong about that. Generally stable input is tied to durability
>> model which is a key design point for engines.
>>
>> True that it isn't the only use, and I know you have been trying to nail
>> down what the uses actually are. Ben wrote up various uses in a portable
>> manner at https://beam.apache.org/documentation/execution-model.
>>
>>  - Coupled failure is the use where Reshuffle is to provide stable input
>>  - Breaking dependent parallelism is more portable - but since it is the
>> identity transform a runner may just elide it; it is a hint, basically, and
>> that seem OK (but can we do it more directly?)
>>
>> What I don't want is to build something where the implementation details
>> are the spec, and not fundamental, which is sort of where Reshuffle lies.
>> This thread highlights that this is a pretty urgent problem with our SDKs
>> and runners that it would be very helpful to work on.
>>
>> Kenn
>>
>>
>>
>> On Fri, May 18, 2018 at 7:50 AM Eugene Kirpichov 
>> wrote:
>>
>>> Agreed that it should be undeprecated, many users are getting confused
>>> by this.
>>> I know that some people are working on a replacement for at least one of
>>> its use cases (RequiresStableInput), but the use case of breaking fusion
>>> is, as of yet, unaddressed, and there's not much to be gained by keeping it
>>> deprecated.
>>>
>>> On Fri, May 18, 2018 at 7:45 AM Raghu Angadi  wrote:
>>>
>>>> I am interested in more clarity on this as well. It has been deprecated
>>>> for a long time without a replacement, and its usage has only grown, both
>>>> within Beam code base as well as in user applications.
>>>>
>>>> If we are certain that it will not be removed before there is a good
>>>> replacement for it, can we undeprecate it until there are proper plans for
>>>> replacement?
>>>>
>>>> On Fri, May 18, 2018 at 7:12 AM Ismaël Mejía  wrote:
>>>>
>>>>> I saw in a recent thread that the use of the Reshuffle transform was
>>>>> recommended to solve an user issue:
>>>>>
>>>>>
>>>>> https://lists.apache.org/thread.html/87ef575ac67948868648e0a8110be242f811bfff8fdaa7f9b758b933@%3Cdev.beam.apache.org%3E
>>>>>
>>>>> I can see why it may fix the reported issue. I am just curious about
>>>>> the fact that the Reshuffle transform is marked as both @Internal and
>>>>> @Deprecated in Beam's SDK.
>>>>>
>>>>> Do we have some alternative? So far the class documentation does not
>>>>> recommend any replacement.
>>>>>
>>>>


Re: What is the future of Reshuffle?

2018-05-18 Thread Raghu Angadi
On Fri, May 18, 2018 at 12:21 PM Robert Bradshaw 
wrote:

> On Fri, May 18, 2018 at 11:46 AM Raghu Angadi  wrote:
>
>> Thanks Kenn.
>>
>> On Fri, May 18, 2018 at 11:02 AM Kenneth Knowles  wrote:
>>
>>> The fact that its usage has grown probably indicates that we have a
>>> large number of transforms that can easily cause data loss / duplication.
>>>
>>
>> Is this specific to Reshuffle or it is true for any GroupByKey? I see
>> Reshuffle as just a wrapper around GBK.
>>
> The issue is when it's used in such a way that data corruption can occur
> when the underlying GBK output is not stable.
>

Could you describe this breakage bit more in detail or give a example?
Apologies in advance, I know this came up in multiple contexts in the past,
but I haven't grokked the issue well. It is the window rewrite that
Reshuffle does that causes misuse of GBK?

Thanks.


Re: What is the future of Reshuffle?

2018-05-18 Thread Raghu Angadi
Thanks Kenn.

On Fri, May 18, 2018 at 11:02 AM Kenneth Knowles  wrote:

> The fact that its usage has grown probably indicates that we have a large
> number of transforms that can easily cause data loss / duplication.
>

Is this specific to Reshuffle or it is true for any GroupByKey? I see
Reshuffle as just a wrapper around GBK.

Raghu.

>
>
> Yes, it is deprecated because it is primarily used as a Dataflow-specific
> way to ensure stable input. My understanding is that the SparkRunner also
> materializes at every GBK so it works there too (is this still the case?).
> It doesn't work at all for other runners AFAIK. So it is @Deprecated not
> because there is a replacement, but because it is kind of dangerous to use. 
> Beam
> could just say "GBK must ensure stable output" and "a composite containing
> a GBK has to ensure stable output even if replaced" and that would solve
> the issue, but I think it would make Beam on Flink impossibly slow - I
> could be wrong about that. Generally stable input is tied to durability
> model which is a key design point for engines.
>
> True that it isn't the only use, and I know you have been trying to nail
> down what the uses actually are. Ben wrote up various uses in a portable
> manner at https://beam.apache.org/documentation/execution-model.
>
>  - Coupled failure is the use where Reshuffle is to provide stable input
>  - Breaking dependent parallelism is more portable - but since it is the
> identity transform a runner may just elide it; it is a hint, basically, and
> that seem OK (but can we do it more directly?)
>
> What I don't want is to build something where the implementation details
> are the spec, and not fundamental, which is sort of where Reshuffle lies.
> This thread highlights that this is a pretty urgent problem with our SDKs
> and runners that it would be very helpful to work on.
>
> Kenn
>
>
>
> On Fri, May 18, 2018 at 7:50 AM Eugene Kirpichov 
> wrote:
>
>> Agreed that it should be undeprecated, many users are getting confused by
>> this.
>> I know that some people are working on a replacement for at least one of
>> its use cases (RequiresStableInput), but the use case of breaking fusion
>> is, as of yet, unaddressed, and there's not much to be gained by keeping it
>> deprecated.
>>
>> On Fri, May 18, 2018 at 7:45 AM Raghu Angadi  wrote:
>>
>>> I am interested in more clarity on this as well. It has been deprecated
>>> for a long time without a replacement, and its usage has only grown, both
>>> within Beam code base as well as in user applications.
>>>
>>> If we are certain that it will not be removed before there is a good
>>> replacement for it, can we undeprecate it until there are proper plans for
>>> replacement?
>>>
>>> On Fri, May 18, 2018 at 7:12 AM Ismaël Mejía  wrote:
>>>
>>>> I saw in a recent thread that the use of the Reshuffle transform was
>>>> recommended to solve an user issue:
>>>>
>>>>
>>>> https://lists.apache.org/thread.html/87ef575ac67948868648e0a8110be242f811bfff8fdaa7f9b758b933@%3Cdev.beam.apache.org%3E
>>>>
>>>> I can see why it may fix the reported issue. I am just curious about
>>>> the fact that the Reshuffle transform is marked as both @Internal and
>>>> @Deprecated in Beam's SDK.
>>>>
>>>> Do we have some alternative? So far the class documentation does not
>>>> recommend any replacement.
>>>>
>>>


Re: What is the future of Reshuffle?

2018-05-18 Thread Raghu Angadi
I am interested in more clarity on this as well. It has been deprecated for
a long time without a replacement, and its usage has only grown, both
within Beam code base as well as in user applications.

If we are certain that it will not be removed before there is a good
replacement for it, can we undeprecate it until there are proper plans for
replacement?

On Fri, May 18, 2018 at 7:12 AM Ismaël Mejía  wrote:

> I saw in a recent thread that the use of the Reshuffle transform was
> recommended to solve an user issue:
>
>
> https://lists.apache.org/thread.html/87ef575ac67948868648e0a8110be242f811bfff8fdaa7f9b758b933@%3Cdev.beam.apache.org%3E
>
> I can see why it may fix the reported issue. I am just curious about
> the fact that the Reshuffle transform is marked as both @Internal and
> @Deprecated in Beam's SDK.
>
> Do we have some alternative? So far the class documentation does not
> recommend any replacement.
>


Re: Pubsub to Beam SQL

2018-05-04 Thread Raghu Angadi
On Thu, May 3, 2018 at 12:47 PM Anton Kedin  wrote:

> I think it makes sense for the case when timestamp is provided in the
> payload (including pubsub message attributes).  We can mark the field as an
> event timestamp. But if the timestamp is internally defined by the source
> (pubsub message publish time) and not exposed in the event body, then we
> need a source-specific mechanism to extract and map the event timestamp to
> the schema. This is, of course, if we don't automatically add a magic
> timestamp field which Beam SQL can populate behind the scenes and add to
> the schema. I want to avoid this magic path for now.
>

Commented on the PR. As Kenn mentioned, every element in Beam has an event
timestamp, there is no requirement to extract the timestamp by the SQL
transform. Using the element timestamp takes care of Pubsub publish
timestamp as well (in fact, this is the default when timestamp attribute is
not specified in PubsubIO).

How timestamp are customized is specific to each source. That way custom
timestamp option seem like they belong in TBLPROPERTIES. E.g. for KafkaIO,
it could specify "logAppendTime", "createTime", or "processingTime" etc
(though I am not sure how user can provide their own custom extractor in
Beam SQL, may be it could support a timestamp field in json records).

Raghu.

>
> On Thu, May 3, 2018 at 11:10 AM Andrew Pilloud 
> wrote:
>
>> This sounds awesome!
>>
>> Is event timestamp something that we need to specify for every source? If
>> so, I would suggest we add this as a first class option on CREATE TABLE
>> rather then something hidden in TBLPROPERTIES.
>>
>> Andrew
>>
>> On Wed, May 2, 2018 at 10:30 AM Anton Kedin  wrote:
>>
>>> Hi
>>>
>>> I am working on adding functionality to support querying Pubsub messages
>>> directly from Beam SQL.
>>>
>>> *Goal*
>>>   Provide Beam users a pure  SQL solution to create the pipelines with
>>> Pubsub as a data source, without the need to set up the pipelines in
>>> Java before applying the query.
>>>
>>> *High level approach*
>>>
>>>-
>>>- Build on top of PubsubIO;
>>>- Pubsub source will be declared using CREATE TABLE DDL statement:
>>>   - Beam SQL already supports declaring sources like Kafka and Text
>>>   using CREATE TABLE DDL;
>>>   - it supports additional configuration using TBLPROPERTIES
>>>   clause. Currently it takes a text blob, where we can put a JSON
>>>   configuration;
>>>   - wrapping PubsubIO into a similar source looks feasible;
>>>- The plan is to initially support messages only with JSON payload:
>>>-
>>>   - more payload formats can be added later;
>>>- Messages will be fully described in the CREATE TABLE statements:
>>>   - event timestamps. Source of the timestamp is configurable. It
>>>   is required by Beam SQL to have an explicit timestamp column for 
>>> windowing
>>>   support;
>>>   - messages attributes map;
>>>   - JSON payload schema;
>>>- Event timestamps will be taken either from publish time or
>>>user-specified message attribute (configurable);
>>>
>>> Thoughts, ideas, comments?
>>>
>>> More details are in the doc here:
>>> https://docs.google.com/document/d/1wIXTxh-nQ3u694XbF0iEZX_7-b3yi4ad0ML2pcAxYfE
>>>
>>>
>>> Thank you,
>>> Anton
>>>
>>


Re: Kafka connector for Beam Python SDK

2018-04-30 Thread Raghu Angadi
On Mon, Apr 30, 2018 at 8:05 AM Chamikara Jayalath 
wrote:

> Hi Aljoscha,
>
> I tried to cover this in the doc. Once we have full support for
> cross-language IO, we can decide this on a case-by-case basis. But I don't
> think we should cease defining new sources/sinks for Beam Python SDK till
> we get to that point. I think there are good reasons for adding Kafka
> support for Python today and many Beam users have request this. Also, note
> that proposed Python Kafka source will be based on the Splittable DoFn
> framework while the current Java version is based on the UnboundedSource
> framework. Here are the reasons that are currently listed in the doc.
>
>
>-
>
>Users might find it useful to have at least one unbounded source and
>sink combination implemented in Python SDK and Kafka is the streaming
>system that makes most sense to support if we just want to add support for
>only one such system in Python SDK.
>-
>
>Not all runners might support cross-language IO. Also some
>user/runner/deployment combinations might require an unbounded source/sink
>implemented in Python SDK.
>-
>
>We recently added Splittable DoFn support to Python SDK. It will be
>good to have at least one production quality Splittable DoFn that will
>server as a good example for any users who wish to implement new Splittable
>DoFn implementations on top of Beam Python SDK.
>
>
+1


>
>-
>
>Cross-language transform feature is currently is in the initial
>discussion phase and it could be some time before we can offer existing
>Java implementation of Kafka for Python SDK users.
>-
>
>Cross-language IO might take even longer to reach the point where it's
>fully equivalent in expressive power to a transform written in the host
>language - e.g. supporting host-language lambdas as part of the transform
>configuration is likely to take a lot longer than "first-order"
>cross-language IO. KafkaIO in Java uses lambdas as part of transform
>configuration, e.g. timestamp functions.
>
>
FYI, note that old withTimestampFn() methods are deprecated and will be
removed soon.  Built in 'withLogAppendTime()', 'withCreateTime()' etc cover
most use cases and the replacement for custom timestamps
'withTimestampPolicyFactory()' takes a factory class.

 Raghu.


> Thanks,
> Cham
>
> On Mon, Apr 30, 2018 at 2:14 AM Aljoscha Krettek 
> wrote:
>
>> Is this what we want to do in the long run, i.e. implement copies of
>> connectors for different SDKs? I thought the plan was to enable using
>> connectors written in different languages, i.e. use the Java Kafka I/O from
>> python. This way we wouldn't duplicate bugs for three different language
>> (Java, Python, and Go for now).
>>
>> Best,
>> Aljoscha
>>
>>
>> On 29. Apr 2018, at 20:46, Eugene Kirpichov  wrote:
>>
>> Thanks Cham, this is great! I left just a couple of comments on the doc.
>>
>> On Fri, Apr 27, 2018 at 10:06 PM Chamikara Jayalath 
>> wrote:
>>
>>> Hi All,
>>>
>>> I'm looking into adding a Kafka connector to Beam Python SDK. I think
>>> this will benefits many Python SDK users and will serve as a good example
>>> for recently added Splittable DoFn API (Fn API support which will allow all
>>> runners to use Python Splittable DoFn is in active development).  I created
>>> a document [1] that makes the case for adding this connector and compares
>>> the performance of available Python Kafka client libraries. Also I created
>>> a POC [2] that illustrates the API and how Python SDF API can be used to
>>> implement a Kafka source. I extremely appreciate any feedback related to
>>> this.
>>>
>>> [1]
>>> https://docs.google.com/document/d/1ogRS-e-HYYTHsXi_l2zDUUOnvfzEbub3BFkPrYIOawU/edit?usp=sharing
>>> [2]
>>> https://github.com/chamikaramj/beam/commit/982767b69198579b22522de6794242142d12c5f9
>>>
>>> Thanks,
>>> Cham
>>>
>>
>>


Re: Running Nexmark with PubSub issue

2018-04-13 Thread Raghu Angadi
Hi Alexey,

I am not sure if I can get to it soon. If you are planning to run nexmark
with Kafka, would you mind porting changes from my branch to a new PR? I
will be happy to review and get it merged.

Raghu.

On Thu, Apr 12, 2018 at 6:14 AM Alexey Romanenko 
wrote:

> Raghu, Kenneth,
> Yes, creating a separate class instead of inner one helped to overcome
> this issue with serialisation. Seems like this a bug in NexmarkLauncher, so
> I’ll create a jira for this.
> Thank you for help with this.
>
> Btw, Raghu, are you going to submit a PR from your branch? I think this is
> exactly what BEAM-4048 <https://issues.apache.org/jira/browse/BEAM-4048> is
> about (with some adjustment according to what already was merged, for sure).
>
> WBR,
> Alexey
>
> On 11 Apr 2018, at 19:53, Kenneth Knowles  wrote:
>
> Yea, this is a common issue with serializable anonymous inner classes in
> general. It would be nice if Beam Java could have an overarching solution
> to limiting the closure to things actually touched.
>
> Kenn
>
> On Wed, Apr 11, 2018 at 10:30 AM Raghu Angadi  wrote:
>
>> I noticed it too while adding KafkaIO support for Nexmark (this was in
>> parallel to another PR for KafkaIO that got merged recently).
>> The anonymous inner class for DoFn is not serializable. I moved it  to a
>> static class in my branch, but didn't test it yet :
>> https://github.com/rangadi/beam/commit/b49a9eda9f6170ec0979f54438223ab8d2cd466f#diff-c9c486a395311f6f9ee8b9be0a92d907R756
>> .
>>
>> One issue with Nexmark benchmarks with sources like Pubsub and KafkaIO is
>> how the tests are terminated...
>> Raghu.
>>
>> On Wed, Apr 11, 2018 at 9:54 AM Alexey Romanenko <
>> aromanenko@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> For the moment, I'm working on BEAM-4048
>>> <https://issues.apache.org/jira/browse/BEAM-4048> to add Kafka
>>> source/sink support with different modes to Nexmark, like it has for PubSub
>>> (*SUBSCRIBE_ONLY, PUBLISH_ONLY* and *COMBINED*). It seems that the code
>>> will be similar to what we have for PubSub so I wanted to do some
>>> refactoring and reuse already existed code for Kafka. So, to make sure that
>>> nothing will be broken after this refactoring, I want to run Nexmark with
>>> PubSub source/sink in different modes before and after.
>>>
>>> I tried to do this with PubSub emulator but I have very strange issue
>>> related to pipeline serialisation - it tries to serialise
>>> *NexmarkLauncher*, see this output
>>> <https://gist.github.com/aromanenko-dev/a42a0a011f9fbd10b8443bf2ca38c68a>
>>>
>>> Could anyone point me out if I do something wrong running this Nexmark
>>> pipeline and how properly to do it with PubSub as source/sink?
>>>
>>> WBR,
>>> Alexey
>>>
>>
>


Re: Running Nexmark with PubSub issue

2018-04-11 Thread Raghu Angadi
I noticed it too while adding KafkaIO support for Nexmark (this was in
parallel to another PR for KafkaIO that got merged recently).
The anonymous inner class for DoFn is not serializable. I moved it  to a
static class in my branch, but didn't test it yet :
https://github.com/rangadi/beam/commit/b49a9eda9f6170ec0979f54438223ab8d2cd466f#diff-c9c486a395311f6f9ee8b9be0a92d907R756
.

One issue with Nexmark benchmarks with sources like Pubsub and KafkaIO is
how the tests are terminated...
Raghu.

On Wed, Apr 11, 2018 at 9:54 AM Alexey Romanenko 
wrote:

> Hi all,
>
> For the moment, I'm working on BEAM-4048
>  to add Kafka
> source/sink support with different modes to Nexmark, like it has for PubSub
> (*SUBSCRIBE_ONLY, PUBLISH_ONLY* and *COMBINED*). It seems that the code
> will be similar to what we have for PubSub so I wanted to do some
> refactoring and reuse already existed code for Kafka. So, to make sure that
> nothing will be broken after this refactoring, I want to run Nexmark with
> PubSub source/sink in different modes before and after.
>
> I tried to do this with PubSub emulator but I have very strange issue
> related to pipeline serialisation - it tries to serialise
> *NexmarkLauncher*, see this output
> 
>
> Could anyone point me out if I do something wrong running this Nexmark
> pipeline and how properly to do it with PubSub as source/sink?
>
> WBR,
> Alexey
>


Re: About the Gauge metric API

2018-04-06 Thread Raghu Angadi
I am not opposed to removing other data types, though they are extra
convenience for user.

In Scott's example above, if the metric is a counter, what are the
guarantees provided? E.g. would it match the global count using GBK? If
yes, then gauges (especially per-key gauges) can be very useful too (e.g.
backlog for each Kafka partition/split).

On Fri, Apr 6, 2018 at 10:01 AM Robert Bradshaw  wrote:

> A String API makes it clear(er) that the values will not be aggregated in
> any way across workers. I don't think retaining both APIs (except for
> possibly some short migration period) worthwhile. On another note, I still
> find the distributed gague API to be a bit odd in general.
>
> On Fri, Apr 6, 2018 at 9:46 AM Raghu Angadi  wrote:
>
>> I would be in favor of replacing the existing Gauge.set(long) API with
>>> the String version and removing the old one. This would be a breaking
>>> change. However this is a relatively new API and is still marked
>>> @Experimental. Keeping the old API would retain the potential confusion.
>>> It's better to simplify the API surface: having two APIs makes it less
>>> clear which one users should choose.
>>
>>
>> Supporting additional data types sounds good. But the above states string
>> API will replace the existing API. I do not see how string API makes the
>> semantics more clear.  Semantically both are same to the user.
>>
>> On Fri, Apr 6, 2018 at 9:31 AM Pablo Estrada  wrote:
>>
>>> Hi Ben : D
>>>
>>> Sure, that's reasonable. And perhaps I started the discussion in the
>>> wrong direction. I'm not questioning the utility of Gauge metrics.
>>>
>>> What I'm saying is that Beam only supports integers,, but Gauges are
>>> aggregated by dropping old values depending on their update times; so it
>>> might be desirable to not restrict the data type to just integers.
>>>
>>> -P.
>>>
>>> On Fri, Apr 6, 2018 at 9:19 AM Ben Chambers 
>>> wrote:
>>>
>>>> See for instance how gauge metrics are handled in Prometheus, Datadog
>>>> and Stackdriver monitoring. Gauges are perfect for use in distributed
>>>> systems, they just need to be properly labeled. Perhaps we should apply a
>>>> default tag or allow users to specify one.
>>>>
>>>> On Fri, Apr 6, 2018, 9:14 AM Ben Chambers  wrote:
>>>>
>>>>> Some metrics backend label the value, for instance with the worker
>>>>> that sent it. Then the aggregation is latest per label. This makes it
>>>>> useful for holding values such as "memory usage" that need to hold current
>>>>> value.
>>>>>
>>>>> On Fri, Apr 6, 2018, 9:00 AM Scott Wegner  wrote:
>>>>>
>>>>>> +1 on the proposal to support a "String" gauge.
>>>>>>
>>>>>> To expand a bit, the current API doesn't make it clear that the gauge
>>>>>> value is based on local state. If a runner chooses to parallelize a DoFn
>>>>>> across many workers, each worker will have its own local Gauge metric and
>>>>>> its updates will overwrite other values. For example, from the API it 
>>>>>> looks
>>>>>> like you could use a gauge to implement your own element count metric:
>>>>>>
>>>>>> long count = 0;
>>>>>> @ProcessElement
>>>>>> public void processElement(ProcessContext c) {
>>>>>>   myGauge.set(++count);
>>>>>>   c.output(c.element());
>>>>>> }
>>>>>>
>>>>>> This looks correct, but each worker has their own local 'count'
>>>>>> field, and gauge metric updates from parallel workers will overwrite each
>>>>>> other rather than get aggregated. So the final value would be "the number
>>>>>> of elements processed on one of the workers". (The correct implementation
>>>>>> uses a Counter metric).
>>>>>>
>>>>>> I would be in favor of replacing the existing Gauge.set(long) API
>>>>>> with the String version and removing the old one. This would be a 
>>>>>> breaking
>>>>>> change. However this is a relatively new API and is still marked
>>>>>> @Experimental. Keeping the old API would retain the potential confusion.
>>>>>> It's better to simplify the API surface: having two APIs makes it less
>>>&

Re: About the Gauge metric API

2018-04-06 Thread Raghu Angadi
>
> I would be in favor of replacing the existing Gauge.set(long) API with the
> String version and removing the old one. This would be a breaking change.
> However this is a relatively new API and is still marked @Experimental.
> Keeping the old API would retain the potential confusion. It's better to
> simplify the API surface: having two APIs makes it less clear which one
> users should choose.


Supporting additional data types sounds good. But the above states string
API will replace the existing API. I do not see how string API makes the
semantics more clear.  Semantically both are same to the user.

On Fri, Apr 6, 2018 at 9:31 AM Pablo Estrada  wrote:

> Hi Ben : D
>
> Sure, that's reasonable. And perhaps I started the discussion in the wrong
> direction. I'm not questioning the utility of Gauge metrics.
>
> What I'm saying is that Beam only supports integers,, but Gauges are
> aggregated by dropping old values depending on their update times; so it
> might be desirable to not restrict the data type to just integers.
>
> -P.
>
> On Fri, Apr 6, 2018 at 9:19 AM Ben Chambers  wrote:
>
>> See for instance how gauge metrics are handled in Prometheus, Datadog and
>> Stackdriver monitoring. Gauges are perfect for use in distributed systems,
>> they just need to be properly labeled. Perhaps we should apply a default
>> tag or allow users to specify one.
>>
>> On Fri, Apr 6, 2018, 9:14 AM Ben Chambers  wrote:
>>
>>> Some metrics backend label the value, for instance with the worker that
>>> sent it. Then the aggregation is latest per label. This makes it useful for
>>> holding values such as "memory usage" that need to hold current value.
>>>
>>> On Fri, Apr 6, 2018, 9:00 AM Scott Wegner  wrote:
>>>
 +1 on the proposal to support a "String" gauge.

 To expand a bit, the current API doesn't make it clear that the gauge
 value is based on local state. If a runner chooses to parallelize a DoFn
 across many workers, each worker will have its own local Gauge metric and
 its updates will overwrite other values. For example, from the API it looks
 like you could use a gauge to implement your own element count metric:

 long count = 0;
 @ProcessElement
 public void processElement(ProcessContext c) {
   myGauge.set(++count);
   c.output(c.element());
 }

 This looks correct, but each worker has their own local 'count' field,
 and gauge metric updates from parallel workers will overwrite each other
 rather than get aggregated. So the final value would be "the number of
 elements processed on one of the workers". (The correct implementation uses
 a Counter metric).

 I would be in favor of replacing the existing Gauge.set(long) API with
 the String version and removing the old one. This would be a breaking
 change. However this is a relatively new API and is still marked
 @Experimental. Keeping the old API would retain the potential confusion.
 It's better to simplify the API surface: having two APIs makes it less
 clear which one users should choose.

 On Fri, Apr 6, 2018 at 8:28 AM Pablo Estrada 
 wrote:

> Hello all,
> As I was working on adding support for Gauges in Dataflow, some noted
> that Gauge is a fairly unusual kind of metric for a distributed
> environment, since many workers will report different values and stomp on
> each other's all the time.
>
> We also looked at Flink and Dropwizard Gauge metrics [1][2], and we
> found that these use generics, and Flink explicitly mentions that a
> toString implementation is required[3].
>
> With that in mind, I'm thinking that it might make sense to 1) expand
> Gauge to support string values (keep int-based API for backwards
> compatibility), and migrate it to use string behind the covers.
>
> What does everyone think about this?
>
> Best
> -P.
>
> 1 -
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#metric-types
> 2 - https://metrics.dropwizard.io/3.1.0/manual/core/#gauges
> 3 -
> https://github.com/apache/flink/blob/master/docs/monitoring/metrics.md#gauge
> JIRA issue for Gauge metrics -
> https://issues.apache.org/jira/browse/BEAM-1616
> --
> Got feedback? go/pabloem-feedback
> 
>
 --


 Got feedback? http://go/swegner-feedback

>>> --
> Got feedback? go/pabloem-feedback
> 
>


Re: "Radically modular data ingestion APIs in Apache Beam" @ Strata - slides available

2018-03-08 Thread Raghu Angadi
Terrific! Thanks Eugene. Just the slides themselves are so good, can't wait
for the video.
Do you know when the video might be available?


On Thu, Mar 8, 2018 at 12:16 PM Eugene Kirpichov 
wrote:

> Oops that's just the template I used. Thanks for noticing, will regenerate
> the PDF and reupload when I get to it.
>
> On Thu, Mar 8, 2018, 11:59 AM Dan Halperin  wrote:
>
>> Looks like it was a good talk! Why is it Google Confidential &
>> Proprietary, though?
>>
>> Dan
>>
>> On Thu, Mar 8, 2018 at 11:49 AM, Eugene Kirpichov 
>> wrote:
>>
>>> Hey all,
>>>
>>> The slides for my yesterday's talk at Strata San Jose
>>> https://conferences.oreilly.com/strata/strata-ca/public/schedule/detail/63696
>>>  have
>>> been posted on the talk page. They may be of interest both to users and IO
>>> authors.
>>>
>>> Thanks.
>>>
>>
>>


Re: A 15x speed-up in local Python DirectRunner execution

2018-02-08 Thread Raghu Angadi
This is terrific news! Thanks Charles.

On Wed, Feb 7, 2018 at 5:55 PM, Charles Chen  wrote:

> Local execution of Beam pipelines on the Python DirectRunner currently
> suffers from performance issues, which makes it hard for pipeline authors
> to iterate, especially on medium to large size datasets.  We would like to
> optimize and make this a better experience for Beam users.
>
> The FnApiRunner was written as a way of leveraging the portability
> framework execution code path for local portability development. We've
> found it also provides great speedups in batch execution with no user
> changes required, so we propose to switch to use this runner by default in
> batch pipelines.  For example, WordCount on the Shakespeare dataset with a
> single CPU core now takes 50 seconds to run, compared to 12 minutes before;
> this is a 15x performance improvement that users can get for free, with
> no user pipeline changes.
>
> The JIRA for this change is here (https://issues.apache.org/
> jira/browse/BEAM-3644), and a candidate patch is available here (
> https://github.com/apache/beam/pull/4634). I have been working over the
> last month on making this an automatic drop-in replacement for the current
> DirectRunner when applicable.  Before it becomes the default, you can try
> this runner now by manually specifying apache_beam.runners.
> portability.fn_api_runner.FnApiRunner as the runner.
>
> Even with this change, local Python pipeline execution can only
> effectively use one core because of the Python GIL.  A natural next step to
> further improve performance will be to refactor the FnApiRunner to allow
> for multi-process execution.  This is being tracked here (
> https://issues.apache.org/jira/browse/BEAM-3645).
>
> Best,
>
> Charles
>


Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

2018-02-05 Thread Raghu Angadi
Hi Sushil,

That is expected behavior. If you don't have any saved checkpoint, the
pipeline would start from scratch. It does not have any connection to
previous run.

On Thu, Feb 1, 2018 at 1:29 AM, Sushil Ks  wrote:

> Hi,
>Apologies for delay in my reply,
>
> @Raghu Angadi
> This checkpoints 20 mins, as you mentioned before any
> checkpoint is created and if the pipeline restarts, it's reading from the
> latest offset.
>
> @Mingmin
> Thanks a lot for sharing your learnings, However in case of any
> *UserCodeException* while processing the element as part of ParDo after
> materializing the window, the pipeline drops the unprocessed elements and
> restarts. Is this expected from Beam?
>
>
> On Wed, Jan 17, 2018 at 2:13 AM, Kenneth Knowles  wrote:
>
>> Is there a JIRA filed for this? I think this discussion should live in a
>> ticket.
>>
>> Kenn
>>
>> On Wed, Jan 10, 2018 at 11:00 AM, Mingmin Xu  wrote:
>>
>>> @Sushil, I have several jobs running on KafkaIO+FlinkRunner, hope my
>>> experience can help you a bit.
>>>
>>> For short, `ENABLE_AUTO_COMMIT_CONFIG` doesn't meet your requirement,
>>> you need to leverage exactly-once checkpoint/savepoint in Flink. The reason
>>> is,  with `ENABLE_AUTO_COMMIT_CONFIG` KafkaIO commits offset after data is
>>> read, and once job is restarted KafkaIO reads from last_committed_offset.
>>>
>>> In my jobs, I enable external(external should be optional I think?)
>>> checkpoint on exactly-once mode in Flink cluster. When the job auto-restart
>>> on failures it doesn't lost data. In case of manually redeploy the job, I
>>> use savepoint to cancel and launch the job.
>>>
>>> Mingmin
>>>
>>> On Wed, Jan 10, 2018 at 10:34 AM, Raghu Angadi 
>>> wrote:
>>>
>>>> How often does your pipeline checkpoint/snapshot? If the failure
>>>> happens before the first checkpoint, the pipeline could restart without any
>>>> state, in which case KafkaIO would read from latest offset. There is
>>>> probably some way to verify if pipeline is restarting from a checkpoint.
>>>>
>>>> On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks  wrote:
>>>>
>>>>> HI Aljoscha,
>>>>>The issue is let's say I consumed 100 elements in 5
>>>>> mins Fixed Window with *GroupByKey* and later I applied *ParDO* for
>>>>> all those elements. If there is an issue while processing element 70 in
>>>>> *ParDo *and the pipeline restarts with *UserCodeException *it's
>>>>> skipping the rest 30 elements. Wanted to know if this is expected? In case
>>>>> if you still having doubt let me know will share a code snippet.
>>>>>
>>>>> Regards,
>>>>> Sushil Ks
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> 
>>> Mingmin
>>>
>>
>>
>


Re: coder evolutions?

2018-02-05 Thread Raghu Angadi
Could you describe 2nd issue bit more in detail may be with a short example?
LengthAwareCoder in the PR adds another buffer copy..
(BufferedElementCountingOutputStream already has extra buffer copy).

On Mon, Feb 5, 2018 at 10:34 AM, Romain Manni-Bucau 
wrote:

> Would this work for everyone - can update the pr if so:
>
> If coder is not built in
> Prefix with byte size
> Else
> Current behavior
>
> ?
>
> Le 5 févr. 2018 19:21, "Romain Manni-Bucau"  a
> écrit :
>
>> Answered inlined but I want to highlight beam is a portable API on top of
>> well known vendors API which have friendly shortcuts. So the background
>> here is to make beam at least user friendly.
>>
>> Im fine if the outcome of the discussion is coder concept is wrong or
>> something like that but Im not fine to say we dont want to solve an API
>> issue, to not say bug, of a project which has an API as added value.
>>
>> I understand the perf concern which must be balanced with the fact
>> derialization is not used for each step/transform and that currently the
>> coder API is already intrusive and heavy for dev but also not usable by
>> most existing codecs out there. Even some jaxb or plain xml flavors dont
>> work with it :(.
>>
>> Le 5 févr. 2018 18:46, "Robert Bradshaw"  a écrit :
>>
>> On Sun, Feb 4, 2018 at 6:44 AM, Romain Manni-Bucau
>>  wrote:
>> > Hi guys,
>> >
>> > I submitted a PR on coders to enhance 1. the user experience 2. the
>> > determinism and handling of coders.
>> >
>> > 1. the user experience is linked to what i sent some days ago: close
>> > handling of the streams from a coder code. Long story short I add a
>> > SkipCloseCoder which can decorate a coder and just wraps the stream
>> (input
>> > or output) in flavors skipping close() calls. This avoids to do it by
>> > default (which had my preference if you read the related thread but not
>> the
>> > one of everybody) but also makes the usage of a coder with this issue
>> easy
>> > since the of() of the coder just wraps itself in this delagating coder.
>> >
>> > 2. this one is more nasty and mainly concerns IterableLikeCoders. These
>> ones
>> > use this kind of algorithm (keep in mind they work on a list):
>> >
>> > writeSize()
>> > for all element e {
>> > elementCoder.write(e)
>> > }
>> > writeMagicNumber() // this one depends the size
>> >
>> > The decoding is symmetric so I bypass it here.
>> >
>> > Indeed all these writes (reads) are done on the same stream. Therefore
>> it
>> > assumes you read as much bytes than you write...which is a huge
>> assumption
>> > for a coder which should by contract assume it can read the stream...as
>> a
>> > stream (until -1).
>> >
>> > The idea of the fix is to change this encoding to this kind of
>> algorithm:
>> >
>> > writeSize()
>> > for all element e {
>> > writeElementByteCount(e)
>> > elementCoder.write(e)
>> > }
>> > writeMagicNumber() // still optionally
>>
>> Regardless of the backwards incompatibility issues, I'm unconvinced
>> that prefixing every element with its length is a good idea. It can
>> lead to blow-up in size (e.g. a list of ints, and it should be noted
>> that containers with lots of elements bias towards having small
>> elements) and also writeElementByteCount(e) could be very inefficient
>> for many type e (e.g. a list of lists).
>>
>>
>> What is your proposal Robert then? Current restriction is clearly a
>> blocker for portability, users, determinism and is unsafe and only
>> checkable at runtime so not something we should lead to keep.
>>
>> Alternative i thought about was to forbid implicit coders but it doesnt
>> help users.
>>
>>
>>
>> > This way on the decode size you can wrap the stream by element to
>> enforce
>> > the limitation of the byte count.
>> >
>> > Side note: this indeed enforce a limitation due to java byte limitation
>> but
>> > if you check coder code it is already here at the higher level so it is
>> not
>> > a big deal for now.
>> >
>> > In terms of implementation it uses a LengthAwareCoder which delegates to
>> > another coder the encoding and just adds the byte count before the
>> actual
>> > serialization. Not perfect but should be more than enough in terms of
>> > support and perf for beam if you think real pipelines (we try to avoid
>> > serializations or it is done on some well known points where this algo
>> > should be enough...worse case it is not a huge overhead, mainly just
>> some
>> > memory overhead).
>> >
>> >
>> > The PR is available at https://github.com/apache/beam/pull/4594. If you
>> > check you will see I put it "WIP". The main reason is that it changes
>> the
>> > encoding format for containers (lists, iterable, ...) and therefore
>> breaks
>> > python/go/... tests and the standard_coders.yml definition. Some help on
>> > that would be very welcomed.
>> >
>> > Technical side note if you wonder: UnownedInputStream doesn't even
>> allow to
>> > mark the stream so there is no real fast way to read the stream as fast
>> as
>> > possible with sta

Re: Filesystems.copy and .rename behavior

2018-02-02 Thread Raghu Angadi
On Fri, Feb 2, 2018 at 10:21 AM, Reuven Lax  wrote:

> However this code might run in streaming as well, right?
>

True. What is the best practices recommendation to handle it? Probably the
author of the sink transform should consider the context and decide if
needs to be retry tolerant while setting the transform. I think the current
behavior of not overwriting the output would be very surprising to the
unsuspecting users.


>
> On Fri, Feb 2, 2018 at 9:54 AM, Raghu Angadi  wrote:
>
>> In a batch pipeline, is it considered a data loss if the the stage fails
>> (assuming it does not set IGNORE_MISSING_FILES and fails hard)? If not, it
>> might be better to favor correctness and fail in current implementation.
>>
>>
>> On Thu, Feb 1, 2018 at 4:07 PM, Robert Bradshaw 
>> wrote:
>>
>>> You could add a step to delete all of dest before a barrier and the
>>> step that does the rename as outlined. In that case, any dest file
>>> that exists must be good.
>>>
>>> On Thu, Feb 1, 2018 at 2:52 PM, Eugene Kirpichov 
>>> wrote:
>>> > I think this is still unsafe in case exists(dst) (e.g. this is a
>>> re-run of a
>>> > pipeline) but src is missing due to some bad reason. However it's
>>> probably
>>> > better than what we have (e.g. we currently certainly don't perform
>>> checksum
>>> > checks).
>>> >
>>> > On Thu, Feb 1, 2018 at 2:45 PM Udi Meiri  wrote:
>>> >>
>>> >> For GCS, I would do what I believe we already do.
>>> >> rename(src, dst):
>>> >> - if !exists(src) and exists(dst) return 0
>>> >> - if !exists(src) and !exists(dst) return error
>>> >> - if exists(src) and exists(dst) { if checksum(src) == checksum(dst)
>>> >> return 0 else delete(dst) }
>>> >> - Start a GCS copy from src to dst.
>>> >> - Wait for GCS copy to complete.
>>> >> - delete(src)
>>> >>
>>> >> For filesystems that don't have checksum() metadata, size() can be
>>> used
>>> >> instead.
>>> >>
>>> >> I've opened a bug to track this:
>>> >> https://issues.apache.org/jira/browse/BEAM-3600
>>> >>
>>> >> On Thu, Feb 1, 2018 at 2:25 PM Eugene Kirpichov >> >
>>> >> wrote:
>>> >>>
>>> >>> Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore
>>> files
>>> >>> that are missing for more ominous reasons than just this being a
>>> non-first
>>> >>> attempt at renaming src to dst. E.g. if there was a bug in
>>> constructing the
>>> >>> filename to be renamed, or if we somehow messed up the order of
>>> rename vs
>>> >>> cleanup, etc. - these situations with IGNORE_MISSING_FILES would
>>> lead to
>>> >>> silent data loss (likely caught by unit tests though - so this is
>>> not a
>>> >>> super serious issue).
>>> >>>
>>> >>> Basically I just can't think of a case when I was copying files and
>>> >>> thinking "oh man, I wish it didn't give an error if the stuff I'm
>>> copying
>>> >>> doesn't exist" - the option exists only because we couldn't come up
>>> with
>>> >>> another way to implement idempotent rename on GCS.
>>> >>>
>>> >>> What's your idea of how a safe retryable GCS rename() could work?
>>> >>>
>>> >>> On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri  wrote:
>>> >>>>
>>> >>>> Eugene, if I get this right, you're saying that
>>> IGNORE_MISSING_FILES is
>>> >>>> unsafe because it will skip (src, dst) pairs where neither exist?
>>> (it only
>>> >>>> looks if src exists)
>>> >>>>
>>> >>>> For GCS, we can construct a safe retryable rename() operation,
>>> assuming
>>> >>>> that copy() and delete() are atomic for a single file or pair of
>>> files.
>>> >>>>
>>> >>>>
>>> >>>>
>>> >>>> On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi 
>>> wrote:
>>> >>>>>
>>> >>>>> On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov
>>> >>>>>  wrote:
>>> >>>>

Re: Filesystems.copy and .rename behavior

2018-02-02 Thread Raghu Angadi
In a batch pipeline, is it considered a data loss if the the stage fails
(assuming it does not set IGNORE_MISSING_FILES and fails hard)? If not, it
might be better to favor correctness and fail in current implementation.

On Thu, Feb 1, 2018 at 4:07 PM, Robert Bradshaw  wrote:

> You could add a step to delete all of dest before a barrier and the
> step that does the rename as outlined. In that case, any dest file
> that exists must be good.
>
> On Thu, Feb 1, 2018 at 2:52 PM, Eugene Kirpichov 
> wrote:
> > I think this is still unsafe in case exists(dst) (e.g. this is a re-run
> of a
> > pipeline) but src is missing due to some bad reason. However it's
> probably
> > better than what we have (e.g. we currently certainly don't perform
> checksum
> > checks).
> >
> > On Thu, Feb 1, 2018 at 2:45 PM Udi Meiri  wrote:
> >>
> >> For GCS, I would do what I believe we already do.
> >> rename(src, dst):
> >> - if !exists(src) and exists(dst) return 0
> >> - if !exists(src) and !exists(dst) return error
> >> - if exists(src) and exists(dst) { if checksum(src) == checksum(dst)
> >> return 0 else delete(dst) }
> >> - Start a GCS copy from src to dst.
> >> - Wait for GCS copy to complete.
> >> - delete(src)
> >>
> >> For filesystems that don't have checksum() metadata, size() can be used
> >> instead.
> >>
> >> I've opened a bug to track this:
> >> https://issues.apache.org/jira/browse/BEAM-3600
> >>
> >> On Thu, Feb 1, 2018 at 2:25 PM Eugene Kirpichov 
> >> wrote:
> >>>
> >>> Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore
> files
> >>> that are missing for more ominous reasons than just this being a
> non-first
> >>> attempt at renaming src to dst. E.g. if there was a bug in
> constructing the
> >>> filename to be renamed, or if we somehow messed up the order of rename
> vs
> >>> cleanup, etc. - these situations with IGNORE_MISSING_FILES would lead
> to
> >>> silent data loss (likely caught by unit tests though - so this is not a
> >>> super serious issue).
> >>>
> >>> Basically I just can't think of a case when I was copying files and
> >>> thinking "oh man, I wish it didn't give an error if the stuff I'm
> copying
> >>> doesn't exist" - the option exists only because we couldn't come up
> with
> >>> another way to implement idempotent rename on GCS.
> >>>
> >>> What's your idea of how a safe retryable GCS rename() could work?
> >>>
> >>> On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri  wrote:
> >>>>
> >>>> Eugene, if I get this right, you're saying that IGNORE_MISSING_FILES
> is
> >>>> unsafe because it will skip (src, dst) pairs where neither exist? (it
> only
> >>>> looks if src exists)
> >>>>
> >>>> For GCS, we can construct a safe retryable rename() operation,
> assuming
> >>>> that copy() and delete() are atomic for a single file or pair of
> files.
> >>>>
> >>>>
> >>>>
> >>>> On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi 
> wrote:
> >>>>>
> >>>>> On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov
> >>>>>  wrote:
> >>>>>>
> >>>>>> As far as I know, the current implementation of file sinks is the
> only
> >>>>>> reason why the flag IGNORE_MISSING for copying even exists -
> there's no
> >>>>>> other compelling reason to justify it. We implement "rename" as
> "copy, then
> >>>>>> delete" (in a single DoFn), so for idempodency of this operation we
> need to
> >>>>>> ignore the copying of a non-existent file.
> >>>>>>
> >>>>>> I think the right way to go would be to change the implementation of
> >>>>>> renaming to have a @RequiresStableInput (or reshuffle) in the
> middle, so
> >>>>>> it's made of 2 individually idempotent operations:
> >>>>>> 1) copy, which would fail if input is missing, and would overwrite
> >>>>>> output if it exists
> >>>>>> -- reshuffle --
> >>>>>> 2) delete, which would not fail if input is missing.
> >>>>>
> >>>>>
> >>>>> Something like this is

RFC: Better event time and source watermark in KafkaIO

2018-02-01 Thread Raghu Angadi
Kafka supports server-side and client-side timestamps since version 0.10.1.
KafkaIO in Beam can provide much better watermark, especially for topics
with server-side timestamps. The default implementation currently just uses
processing time for event time and watermark, which is not very useful.

Wrote a short doc
[1]
about the proposal. Your feedback is welcome. I am planning to work on it,
and don't mind guiding if anyone else is interested (it is fairly
accessible for newcomers) .

TL;DR :
   *server-side timestamp* : It monotonically increases within a Kafka
partition. We can provide near perfect watermark : min(timestamp of latest
record consumed on a partition).
*client-side / custom timestamp* : Watermark is min(timestamp over last
few seconds) similar to PubsubIO in Beam. This is not great, but we will
let user provide tighter bounds or provide entirely own implementation.

Thanks,
Raghu.

[1]
https://docs.google.com/document/d/1DyWcLJpALRoUfvYUbiPCDVikYb_Xz2X7Co2aDUVVd4I/edit?usp=sharing


Re: Filesystems.copy and .rename behavior

2018-01-31 Thread Raghu Angadi
On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov 
wrote:

> As far as I know, the current implementation of file sinks is the only
> reason why the flag IGNORE_MISSING for copying even exists - there's no
> other compelling reason to justify it. We implement "rename" as "copy, then
> delete" (in a single DoFn), so for idempodency of this operation we need to
> ignore the copying of a non-existent file.
>
> I think the right way to go would be to change the implementation of
> renaming to have a @RequiresStableInput (or reshuffle) in the middle, so
> it's made of 2 individually idempotent operations:
> 1) copy, which would fail if input is missing, and would overwrite output
> if it exists
> -- reshuffle --
> 2) delete, which would not fail if input is missing.
>

Something like this is needed only in streaming, right?

Raghu.


> That way first everything is copied (possibly via multiple attempts), and
> then old files are deleted (possibly via multiple attempts).
>
> On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri  wrote:
>
>> I agree that overwriting is more in line with user expectations.
>> I believe that the sink should not ignore errors from the filesystem
>> layer. Instead, the FileSystem API should be more well defined.
>> Examples: rename() and copy() should overwrite existing files at the
>> destination, copy() should have an ignore_missing flag.
>>
>> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi  wrote:
>>
>>> Original mail mentions that output from second run of word_count is
>>> ignored. That does not seem as safe as ignoring error from a second attempt
>>> of a step. How do we know second run didn't run on different output?
>>> Overwriting seems more accurate than ignoring. Does handling this error at
>>> sink level distinguish between the two (another run vs second attempt)?
>>>
>>>
>>> On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri  wrote:
>>>
>>>> Yeah, another round of refactoring is due to move the rename via
>>>> copy+delete logic up to the file-based sink level.
>>>>
>>>> On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath 
>>>> wrote:
>>>>
>>>>> Good point. There's always the chance of step that performs final
>>>>> rename being retried. So we'll have to ignore this error at the sink 
>>>>> level.
>>>>> We don't necessarily have to do this at the FileSystem level though. I
>>>>> think the proper behavior might be to raise an error for the rename at the
>>>>> FileSystem level if the destination already exists (or source doesn't
>>>>> exist) while ignoring that error (and possibly logging a warning) at the
>>>>> sink level.
>>>>>
>>>>> - Cham
>>>>>
>>>>>
>>>>> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax  wrote:
>>>>>
>>>>>> I think the idea was to ignore "already exists" errors. The reason
>>>>>> being that any step in Beam can be executed multiple times, including the
>>>>>> rename step. If the rename step gets run twice, the second run should
>>>>>> succeed vacuously.
>>>>>>
>>>>>>
>>>>>> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri  wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> I've been working on HDFS code for the Python SDK and I've noticed
>>>>>>> some behaviors which are surprising. I wanted to know if these behaviors
>>>>>>> are known and intended.
>>>>>>>
>>>>>>> 1. When renaming files during finalize_write, rename errors are
>>>>>>> ignored
>>>>>>> <https://github.com/apache/beam/blob/3aa2bef87c93d2844dd7c8dbaf45db75ec607792/sdks/python/apache_beam/io/filebasedsink.py#L232>.
>>>>>>> For example, if I run wordcount twice using HDFS code I get a warning 
>>>>>>> the
>>>>>>> second time because the file already exists:
>>>>>>>
>>>>>>> WARNING:root:Rename not successful: hdfs://beam-temp-counts2-
>>>>>>> 7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
>>>>>>> -> hdfs://counts2-0-of-1, libhdfs error in renaming
>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da2
>>>>>>> 45/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2 to
>>>>>>> hdfs://counts2-0-of-1 with exceptions Unable to rename
>>>>>>> '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da2
>>>>>>> 45/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2' to
>>>>>>> '/counts2-0-of-1'.
>>>>>>>
>>>>>>> For GCS and local files there are no rename errors (in this case),
>>>>>>> since the rename operation silently overwrites existing destination 
>>>>>>> files.
>>>>>>> However, blindly ignoring these errors might make the pipeline to report
>>>>>>> success even though output files are missing.
>>>>>>>
>>>>>>> 2. Output files (--ouput) overwrite existing files.
>>>>>>>
>>>>>>> 3. The Python SDK doesn't use Filesystems.copy(). The Java SDK
>>>>>>> doesn't use Filesystem.Rename().
>>>>>>>
>>>>>>> Thanks,
>>>>>>> - Udi
>>>>>>>
>>>>>>
>>>>>>
>>>


Re: Filesystems.copy and .rename behavior

2018-01-31 Thread Raghu Angadi
Original mail mentions that output from second run of word_count is
ignored. That does not seem as safe as ignoring error from a second attempt
of a step. How do we know second run didn't run on different output?
Overwriting seems more accurate than ignoring. Does handling this error at
sink level distinguish between the two (another run vs second attempt)?


On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri  wrote:

> Yeah, another round of refactoring is due to move the rename via
> copy+delete logic up to the file-based sink level.
>
> On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath 
> wrote:
>
>> Good point. There's always the chance of step that performs final rename
>> being retried. So we'll have to ignore this error at the sink level. We
>> don't necessarily have to do this at the FileSystem level though. I think
>> the proper behavior might be to raise an error for the rename at the
>> FileSystem level if the destination already exists (or source doesn't
>> exist) while ignoring that error (and possibly logging a warning) at the
>> sink level.
>>
>> - Cham
>>
>>
>> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax  wrote:
>>
>>> I think the idea was to ignore "already exists" errors. The reason being
>>> that any step in Beam can be executed multiple times, including the rename
>>> step. If the rename step gets run twice, the second run should succeed
>>> vacuously.
>>>
>>>
>>> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri  wrote:
>>>
 Hi,
 I've been working on HDFS code for the Python SDK and I've noticed some
 behaviors which are surprising. I wanted to know if these behaviors are
 known and intended.

 1. When renaming files during finalize_write, rename errors are ignored
 .
 For example, if I run wordcount twice using HDFS code I get a warning the
 second time because the file already exists:

 WARNING:root:Rename not successful: hdfs://beam-temp-counts2-
 7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
 -> hdfs://counts2-0-of-1, libhdfs error in renaming
 hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da2
 45/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2 to
 hdfs://counts2-0-of-1 with exceptions Unable to rename
 '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da2
 45/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2' to
 '/counts2-0-of-1'.

 For GCS and local files there are no rename errors (in this case),
 since the rename operation silently overwrites existing destination files.
 However, blindly ignoring these errors might make the pipeline to report
 success even though output files are missing.

 2. Output files (--ouput) overwrite existing files.

 3. The Python SDK doesn't use Filesystems.copy(). The Java SDK doesn't
 use Filesystem.Rename().

 Thanks,
 - Udi

>>>
>>>


Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner

2018-01-10 Thread Raghu Angadi
How often does your pipeline checkpoint/snapshot? If the failure happens
before the first checkpoint, the pipeline could restart without any state,
in which case KafkaIO would read from latest offset. There is probably some
way to verify if pipeline is restarting from a checkpoint.

On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks  wrote:

> HI Aljoscha,
>The issue is let's say I consumed 100 elements in 5
> mins Fixed Window with *GroupByKey* and later I applied *ParDO* for all
> those elements. If there is an issue while processing element 70 in
> *ParDo *and the pipeline restarts with *UserCodeException *it's skipping
> the rest 30 elements. Wanted to know if this is expected? In case if you
> still having doubt let me know will share a code snippet.
>
> Regards,
> Sushil Ks
>


Re: A personal update

2017-12-13 Thread Raghu Angadi
Great to have you back Davor! New venture sounds like terrific news for
Apache Beam. All the best.

On Wed, Dec 13, 2017 at 9:33 AM, Thomas Groh  wrote:

> It's good to see you around. Welcome back.
>
> On Wed, Dec 13, 2017 at 8:43 AM, Chamikara Jayalath 
> wrote:
>
>> Welcome back :)
>>
>> - Cham
>>
>> On Wed, Dec 13, 2017 at 8:41 AM Jason Kuster 
>> wrote:
>>
>>> Glad to have you back. :)
>>>
>>> On Wed, Dec 13, 2017 at 8:32 AM, Eugene Kirpichov 
>>> wrote:
>>>
 Happy to see you return, and thank you again for all you've done so far!

 On Wed, Dec 13, 2017 at 10:24 AM Aljoscha Krettek 
 wrote:

> Welcome back! :-)
>
> > On 13. Dec 2017, at 15:42, Ismaël Mejía  wrote:
> >
> > Hello Davor, great to know you are going to continue contributing to
> > the project. Welcome back and best of wishes for this new phase !
> >
> > On Wed, Dec 13, 2017 at 3:12 PM, Kenneth Knowles 
> wrote:
> >> Great to have you back!
> >>
> >> On Tue, Dec 12, 2017 at 11:20 PM, Robert Bradshaw <
> rober...@google.com>
> >> wrote:
> >>>
> >>> Great to hear from you again, and really happy you're sticking
> around!
> >>>
> >>> - Robert
> >>>
> >>>
> >>> On Tue, Dec 12, 2017 at 10:47 PM, Ahmet Altay 
> wrote:
>  Welcome back! Looking forward to your contributions.
> 
>  Ahmet
> 
>  On Tue, Dec 12, 2017 at 10:05 PM, Jesse Anderson
>  
>  wrote:
> >
> > Congrats!
> >
> >
> > On Wed, Dec 13, 2017, 5:54 AM Jean-Baptiste Onofré <
> j...@nanthrax.net>
> > wrote:
> >>
> >> Hi Davor,
> >>
> >> welcome back !!
> >>
> >> It's really great to see you back active in the Beam community.
> We
> >> really
> >> need you !
> >>
> >> I'm so happy !
> >>
> >> Regards
> >> JB
> >>
> >> On 12/13/2017 05:51 AM, Davor Bonaci wrote:
> >>> My dear friends,
> >>> As many of you have noticed, I’ve been visibly absent from the
> >>> project
> >>> for a
> >>> little while. During this time, a great number of you kept
> reaching
> >>> out, and for
> >>> that I’m deeply humbled and grateful to each and every one of
> you.
> >>>
> >>> I needed some time for personal reflection, which led to a
> >>> transition
> >>> in my
> >>> professional life. As things have settled, I’m happy to again
> be
> >>> working among
> >>> all of you, as we propel this project forward. I plan to be
> active
> >>> in
> >>> the
> >>> future, but perhaps not quite full-time as I was before.
> >>>
> >>> In the near term, I’m working on getting the report to the
> Board
> >>> completed, as
> >>> well as framing the discussion about the project state and
> vision
> >>> going
> >>> forwards. Additionally, I’ll make sure that we foster healthy
> >>> community
> >>> culture
> >>> and operate in the Apache Way.
> >>>
> >>> For those who are curious, I’m happy to say that I’m starting a
> >>> company
> >>> building
> >>> products related to Beam, along with several other members of
> this
> >>> community and
> >>> authors of this technology. I’ll share more on this next year,
> but
> >>> until then if
> >>> you have a data processing problem or an Apache Beam question,
> I’d
> >>> love
> >>> to hear
> >>> from you ;-).
> >>>
> >>> Thanks -- and so happy to be back!
> >>>
> >>> Davor
> >>
> >> --
> >> Jean-Baptiste Onofré
> >> jbono...@apache.org
> >> http://blog.nanthrax.net
> >> Talend - http://www.talend.com
> 
> 
> >>
> >>
>
>
>>>
>>>
>>> --
>>> ---
>>> Jason Kuster
>>> Apache Beam / Google Cloud Dataflow
>>>
>>
>


Re: Guarding against unsafe triggers at construction time

2017-12-04 Thread Raghu Angadi
I have been thinking about this since last week's discussions about
buffering in sinks and was reading https://s.apache.org/beam-sink-triggers. It
says BEAM-3169 is an example of a bug caused by misunderstanding of trigger
semantics.

  - I would like to know which part of the (documented) trigger semantics
it misunderstood. I personally didn't think about how triggers are
propagated/enforced downstream. Always thought of them in the context of
the current step. JavaDoc for triggers does not mention it explicitly.

  - In addition the fix for BEAM-3169 seems to essentially using Reshuffle,
which places elements in its own window. KafkaIO's exactly-once sink
does something
similar
.
Is it a violation of triggers set upstream (as mentioned in a in a recent dev
thread

)?

I guess I am hoping for expanded JavaDoc to describe trigger semantics in
much more detail (preferably with examples) such that users and developers
can understand better not suffer from many subtle bugs. Best practices are
useful, of course, and having users actually understand the right semantics
is also very useful.


On Mon, Dec 4, 2017 at 3:19 PM, Eugene Kirpichov 
wrote:

> Hi,
>
> After a recent investigation of a data loss bug caused by unintuitive
> behavior of some kinds of triggers, we had a discussion about how we can
> protect against future issues like this, and I summarized it in
> https://issues.apache.org/jira/browse/BEAM-3288 . Copying here:
>
> Current Beam trigger semantics are rather confusing and in some cases
> extremely unsafe, especially if the pipeline includes multiple chained
> GBKs. One example of that is https://issues.apache.org/
> jira/browse/BEAM-3169 .
>
> There's multiple issues:
>
> The API allows users to specify terminating top-level triggers (e.g.
> "trigger a pane after receiving 1 elements in the window, and that's
> it"), but experience from user support shows that this is nearly always a
> mistake and the user did not intend to drop all further data.
>
> In general, triggers are the only place in Beam where data is being
> dropped without making a lot of very loud noise about it - a practice for
> which the PTransform style guide uses the language: "never, ever, ever do
> this".
>
> Continuation triggers are still worse. For context: continuation trigger
> is the trigger that's set on the output of a GBK and controls further
> aggregation of the results of this aggregation by downstream GBKs. The
> output shouldn't just use the same trigger as the input, because e.g. if
> the input trigger said "wait for an hour before emitting a pane", that
> doesn't mean that we should wait for another hour before emitting a result
> of aggregating the result of the input trigger. Continuation triggers try
> to simulate the behavior "as if a pane of the input propagated through the
> entire pipeline", but the implementation of individual continuation
> triggers doesn't do that. E.g. the continuation of "first N elements in
> pane" trigger is "first 1 element in pane", and if the results of a first
> GBK are further grouped by a second GBK onto more coarse key (e.g. if
> everything is grouped onto the same key), that effectively means that, of
> the keys of the first GBK, only one survives and all others are dropped
> (what happened in the data loss bug).
>
> The ultimate fix to all of these things is https://s.apache.org/beam-
> sink-triggers . However, it is a huge model change, and meanwhile we have
> to do something. The options are, in order of increasing backward
> incompatibility (but incompatibility in a "rejecting something that
> previously was accepted but extremely dangerous" kind of way):
>
>- *Make the continuation trigger of most triggers be the "always-fire"
>trigger.* Seems that this should be the case for all triggers except
>the watermark trigger. This will definitely increase safety, but lead to
>more eager firing of downstream aggregations. It also will violate a user's
>expectation that a fire-once trigger fires everything downstream only once,
>but that expectation appears impossible to satisfy safely.
>- *Make the continuation trigger of some triggers be the "invalid"
>trigger, *i.e. require the user to set it explicitly: there's in
>general no good and safe way to infer what a trigger on a second GBK
>"truly" should be, based on the trigger of the PCollection input into a
>first GBK. This is especially true for terminating triggers.
>- *Prohibit top-level terminating triggers entirely. *This will ensure
>that the only data that ever gets dropped is "droppably late" data.
>
>
> Do people think that these options are sensible?
> +Kenn Knowles  +Thomas Groh  +Ben

Re: [DISCUSS] Updating contribution guide for gitbox

2017-11-28 Thread Raghu Angadi
On Tue, Nov 28, 2017 at 11:47 AM, Thomas Weise  wrote:

>
> (a) -0 due to extra noise in the commit log
>


> (b) -1 (as standard/default) this should be done by contributor as there
> may be few situation where individual commits should be preserved
>

It is better to preserve the commit history of the PR at least in the
committer branch (and PR).
In addition having to force push squashed commit to remote git branch each
time is quite painful. If we squash at all, final merge into master seems
like the best place.


> (c) +1 the rebase will also record the committer (which would be merge
> commit author otherwise)
>
> In general the process should result in "merged" status for a merged PR as
> opposed to "closed" as seen often currently.
>
> Thanks,
> Thomas
>
>
>
> On Tue, Nov 28, 2017 at 11:23 AM, Kenneth Knowles  wrote:
>
>> On Tue, Nov 28, 2017 at 11:16 AM, Raghu Angadi 
>> wrote:
>>
>>> -1 for (a): no need to see all the private branch commits from
>>> contributor. It often makes me more conscious of local commits.
>>>
>>
>> I want to note that on my PRs these are not private commits. Each one is
>> a meaningful isolated change that can be rolled back and is useful to keep
>> separate when looking at `git blame` or the history of a file. I would
>> encourage every contributor to also do this. A PR is the unit of code
>> review, but the unit of meaningful change to a repository is often much
>> smaller.
>>
>> Kenn
>>
>>
>>> +1 for (b): with committer replacing the squashed commit messages with
>>> '[BEAM-jira or PRID]: Brief cut-n-paste (or longer if it contributor
>>> provided one)'.
>>> -1 for (c): This is quite painful for contributors to work with if there
>>> has been merge conflict with master. Especially for larger PRs with
>>> multiple updates.
>>>
>>> On Tue, Nov 28, 2017 at 10:24 AM, Lukasz Cwik  wrote:
>>>
>>>> Is it possible for mergebot to auto squash any fixup! and perform the
>>>> merge commit as described in (a), if so then I would vote for mergebot.
>>>>
>>>> Without mergebot, I vote:
>>>> (a) 0 I like squashing fixup!
>>>> (b) -1
>>>> (c) +1 Most of our PRs are for focused singular changes which is why I
>>>> would rather squash everything over not squashing anything
>>>>
>>>>
>>>>
>>>> On Tue, Nov 28, 2017 at 9:57 AM, Kenneth Knowles 
>>>> wrote:
>>>>
>>>>> On Tue, Nov 28, 2017 at 9:51 AM, Ben Chambers 
>>>>> wrote:
>>>>>
>>>>>> One risk to "squash and merge" is that it may lead to commits that
>>>>>> don't have clean descriptions -- for instance, commits like "Fixing 
>>>>>> review
>>>>>> comments" will show up. If we use (a) these would also show up as 
>>>>>> separate
>>>>>> commits. It seems like there are two cases of multiple commits in a PR:
>>>>>>
>>>>>> 1. Multiple commits in a PR that have semantic meaning (eg., a PR
>>>>>> performed N steps, split across N commits). In this case, keeping the
>>>>>> descriptions and performing either a merge (if the commits are separately
>>>>>> valid) or squash (if we want the commits to become a single commit in
>>>>>> master) probably makes sense.
>>>>>>
>>>>>
>>>>> Keep 'em
>>>>>
>>>>>
>>>>>> 2. Multiple commits in a PR that just reflect the review history. In
>>>>>> this case, we should probably ask the PR author to explicitly rebase 
>>>>>> their
>>>>>> PR to have semantically meaningful commits prior to merging. (Eg., do a
>>>>>> rebase -i).
>>>>>>
>>>>>
>>>>> Ask the author to squash 'em.
>>>>>
>>>>> Kenn
>>>>>
>>>>>
>>>>>>
>>>>>> On Tue, Nov 28, 2017 at 9:46 AM Kenneth Knowles 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> James brought up a great question in Slack, which was how should we
>>>>>>> use the merge button, illustrated [1]
>>>>>>>
>>>>>>> I want to broaden the discussion to talk about all the new
>>>&g

Re: [DISCUSS] Updating contribution guide for gitbox

2017-11-28 Thread Raghu Angadi
-1 for (a): no need to see all the private branch commits from contributor.
It often makes me more conscious of local commits.
+1 for (b): with committer replacing the squashed commit messages with
'[BEAM-jira or PRID]: Brief cut-n-paste (or longer if it contributor
provided one)'.
-1 for (c): This is quite painful for contributors to work with if there
has been merge conflict with master. Especially for larger PRs with
multiple updates.

On Tue, Nov 28, 2017 at 10:24 AM, Lukasz Cwik  wrote:

> Is it possible for mergebot to auto squash any fixup! and perform the
> merge commit as described in (a), if so then I would vote for mergebot.
>
> Without mergebot, I vote:
> (a) 0 I like squashing fixup!
> (b) -1
> (c) +1 Most of our PRs are for focused singular changes which is why I
> would rather squash everything over not squashing anything
>
>
>
> On Tue, Nov 28, 2017 at 9:57 AM, Kenneth Knowles  wrote:
>
>> On Tue, Nov 28, 2017 at 9:51 AM, Ben Chambers 
>> wrote:
>>
>>> One risk to "squash and merge" is that it may lead to commits that don't
>>> have clean descriptions -- for instance, commits like "Fixing review
>>> comments" will show up. If we use (a) these would also show up as separate
>>> commits. It seems like there are two cases of multiple commits in a PR:
>>>
>>> 1. Multiple commits in a PR that have semantic meaning (eg., a PR
>>> performed N steps, split across N commits). In this case, keeping the
>>> descriptions and performing either a merge (if the commits are separately
>>> valid) or squash (if we want the commits to become a single commit in
>>> master) probably makes sense.
>>>
>>
>> Keep 'em
>>
>>
>>> 2. Multiple commits in a PR that just reflect the review history. In
>>> this case, we should probably ask the PR author to explicitly rebase their
>>> PR to have semantically meaningful commits prior to merging. (Eg., do a
>>> rebase -i).
>>>
>>
>> Ask the author to squash 'em.
>>
>> Kenn
>>
>>
>>>
>>> On Tue, Nov 28, 2017 at 9:46 AM Kenneth Knowles  wrote:
>>>
 Hi all,

 James brought up a great question in Slack, which was how should we use
 the merge button, illustrated [1]

 I want to broaden the discussion to talk about all the new capabilities:

 1. Whether & how to use the "reviewer" field
 2. Whether & how to use the "assignee" field
 3. Whether & how to use the merge button

 My preferences are:

 1. Use the reviewer field instead of "R:" comments.
 2. Use the assignee field to keep track of who the review is blocked on
 (either the reviewer for more comments or the author for fixes)
 3. Use merge commits, but editing the commit subject line

 To expand on part 3, GitHub's merge button has three options [1]. They
 are not described accurately in the UI, as they all say "merge" when only
 one of them performs a merge. They do the following:

 (a) Merge the branch with a merge commit
 (b) Squash all the commits, rebase and push
 (c) Rebase and push without squash

 Unlike our current guide, all of these result in a "merged" status for
 the PR, so we can correctly distinguish those PRs that were actually 
 merged.

 My votes on these options are:

 (a) +1 this preserves the most information
 (b) -1 this erases the most information
 (c) -0 this is just sort of a middle ground; it breaks commit hashes,
 does not have a clear merge commit, but preserves other info

 Kenn

 [1] https://apachebeam.slack.com/messages/C1AAFJYMP/





 Kenn

>>>
>>
>


Re: makes bundle concept usable?

2017-11-17 Thread Raghu Angadi
; >> >> that its API includes the term "checkpoint". In SDF, the "checkpoint"
> >> >> captures the state of processing within a single element. If you're
> >> >> applying an SDF to 1000 elements, it will, like any other DoFn, be
> >> applied
> >> >> to each of them independently and in parallel, and you'll have 1000
> >> >> checkpoints capturing the state of processing each of these elements,
> >> >> which
> >> >> is probably not what you want.
> >> >>
> >> >> I'm afraid I still don't understand what kind of checkpoint you
> need, if
> >> >> it
> >> >> is not just deterministic grouping into batches. "Checkpoint" is a
> very
> >> >> broad term and it's very possible that everybody in this thread is
> >> talking
> >> >> about different things when saying it. So it would help if you could
> >> give
> >> >> a
> >> >> more concrete example: for example, take some IO that you think
> could be
> >> >> easier to write with your proposed API, give the contents of a
> >> >> hypothetical
> >> >> PCollection being written to this IO, give the code of a hypothetical
> >> DoFn
> >> >> implementing the write using your API, and explain what you'd expect
> to
> >> >> happen at runtime.
> >> >>
> >> >> On Thu, Nov 16, 2017 at 10:33 PM Romain Manni-Bucau
> >> >> 
> >> >> wrote:
> >> >>
> >> >>> @Eugene: yes and the other alternative of Reuven too but it is still
> >> >>> 1. relying on timers, 2. not really checkpointed
> >> >>>
> >> >>> In other words it seems all solutions are to create a chunk of size
> 1
> >> >>> and replayable to fake the lack of chunking in the framework. This
> >> >>> always implies a chunk handling outside the component (typically
> >> >>> before for an output). My point is I think IO need it in their own
> >> >>> "internal" or at least control it themselves since the chunk size is
> >> >>> part of the IO handling most of the time.
> >> >>>
> >> >>> I think JB spoke of the same "group before" trick using restrictions
> >> >>> which can work I have to admit if SDF are implemented by runners. Is
> >> >>> there a roadmap/status on that? Last time I checked SDF was a great
> >> >>> API without support :(.
> >> >>>
> >> >>>
> >> >>>
> >> >>> Romain Manni-Bucau
> >> >>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >>>
> >> >>>
> >> >>> 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov
> >> >>> :
> >> >>>>
> >> >>>> JB, not sure what you mean? SDFs and triggers are unrelated, and
> the
> >> >>>> post
> >> >>>> doesn't mention the word. Did you mean something else, e.g.
> >> restriction
> >> >>>> perhaps? Either way I don't think SDFs are the solution here; SDFs
> >> have
> >> >>>
> >> >>> to
> >> >>>>
> >> >>>> do with the ability to split the processing of *a single element*
> over
> >> >>>> multiple calls, whereas Romain I think is asking for repeatable
> >> grouping
> >> >>>
> >> >>> of
> >> >>>>
> >> >>>> *multiple* elements.
> >> >>>>
> >> >>>> Romain - does
> >> >>>>
> >> >>>
> >> >>>
> >> https://github.com/apache/beam/blob/master/sdks/java/
> core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
> >> >>>>
> >> >>>> do what
> >> >>>> you want?
> >> >>>>
> >> >>>> On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré <
> >> j...@nanthrax.net>
> >> >>>> wrote:
> >> >>>>
> >> >>>>> It sounds like the "Trigger" in the Splittable DoFn, no ?
> >> >>>>>
> >> >>>>> https://beam.apache.org/blog/2017/08/16/sp

Re: makes bundle concept usable?

2017-11-17 Thread Raghu Angadi
On Thu, Nov 16, 2017 at 10:40 PM, Eugene Kirpichov <
kirpic...@google.com.invalid> wrote:
>
> [...] So it would help if you could give a
> more concrete example: for example, take some IO that you think could be
> easier to write with your proposed API, give the contents of a hypothetical
> PCollection being written to this IO, give the code of a hypothetical DoFn
> implementing the write using your API, and explain what you'd expect to
> happen at runtime.
>

Exactly what I was thinking. Would be good to see how motivating issue with
ES IO write in the original post would be implemented with one of the
proposed checkpoint APIs (rough pseudo code with a little javadoc to
clarify semantics where appropriate).



>
> On Thu, Nov 16, 2017 at 10:33 PM Romain Manni-Bucau  >
> wrote:
>
> > @Eugene: yes and the other alternative of Reuven too but it is still
> > 1. relying on timers, 2. not really checkpointed
> >
> > In other words it seems all solutions are to create a chunk of size 1
> > and replayable to fake the lack of chunking in the framework. This
> > always implies a chunk handling outside the component (typically
> > before for an output). My point is I think IO need it in their own
> > "internal" or at least control it themselves since the chunk size is
> > part of the IO handling most of the time.
> >
> > I think JB spoke of the same "group before" trick using restrictions
> > which can work I have to admit if SDF are implemented by runners. Is
> > there a roadmap/status on that? Last time I checked SDF was a great
> > API without support :(.
> >
> >
> >
> > Romain Manni-Bucau
> > @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >
> >
> > 2017-11-17 7:25 GMT+01:00 Eugene Kirpichov  >:
> > > JB, not sure what you mean? SDFs and triggers are unrelated, and the
> post
> > > doesn't mention the word. Did you mean something else, e.g. restriction
> > > perhaps? Either way I don't think SDFs are the solution here; SDFs have
> > to
> > > do with the ability to split the processing of *a single element* over
> > > multiple calls, whereas Romain I think is asking for repeatable
> grouping
> > of
> > > *multiple* elements.
> > >
> > > Romain - does
> > >
> > https://github.com/apache/beam/blob/master/sdks/java/
> core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
> > > do what
> > > you want?
> > >
> > > On Thu, Nov 16, 2017 at 10:19 PM Jean-Baptiste Onofré  >
> > > wrote:
> > >
> > >> It sounds like the "Trigger" in the Splittable DoFn, no ?
> > >>
> > >> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
> > >>
> > >> Regards
> > >> JB
> > >>
> > >>
> > >> On 11/17/2017 06:56 AM, Romain Manni-Bucau wrote:
> > >> > it gives the fn/transform the ability to save a state - it can get
> > >> > back on "restart" / whatever unit we can use, probably runner
> > >> > dependent? Without that you need to rewrite all IO usage with
> > >> > something like the previous pattern which makes the IO not self
> > >> > sufficient and kind of makes the entry cost and usage of beam way
> > >> > further.
> > >> >
> > >> > In my mind it is exactly what jbatch/spring-batch uses but adapted
> to
> > >> > beam (stream in particular) case.
> > >> >
> > >> > Romain Manni-Bucau
> > >> > @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> > >> >
> > >> >
> > >> > 2017-11-17 6:49 GMT+01:00 Reuven Lax :
> > >> >> Romain,
> > >> >>
> > >> >> Can you define what you mean by checkpoint? What are the semantics,
> > what
> > >> >> does it accomplish?
> > >> >>
> > >> >> Reuven
> > >> >>
> > >> >> On Fri, Nov 17, 2017 at 1:40 PM, Romain Manni-Bucau <
> > >> rmannibu...@gmail.com>
> > >> >> wrote:
> > >> >>
> > >> >>> Yes, what I propose earlier was:
> > >> >>>
> > >> >>> I. checkpoint marker:
> > >> >>>
> > >> >>> @AnyBeamAnnotation
> > >> >>> @CheckpointAfter
> > >> >>> public void someHook(SomeContext ctx);
> >

Re: makes bundle concept usable?

2017-11-16 Thread Raghu Angadi
How would you define it (rough API is fine)?. Without more details, it is
not easy to see wider applicability and feasibility in runners.

On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau 
wrote:

> This is a fair summary of the current state but also where beam can have a
> very strong added value and make big data great and smooth.
>
> Instead of this replay feature isnt checkpointing willable? In particular
> with SDF no?
>
>
> Le 16 nov. 2017 19:50, "Raghu Angadi"  a
> écrit :
>
> > Core issue here is that there is no explicit concept of 'checkpoint' in
> > Beam (UnboundedSource has a method 'getCheckpointMark' but that refers to
> > the checkoint on external source). Runners do checkpoint internally as
> > implementation detail. Flink's checkpoint model is entirely different
> from
> > Dataflow's and Spark's.
> >
> > @StableReplay helps, but it does not explicitly talk about a checkpoint
> by
> > design.
> >
> > If you are looking to achieve some guarantees with a sink/DoFn, I think
> it
> > is better to start with the requirements. I worked on exactly-once sink
> for
> > Kafka (see KafkaIO.write().withEOS()), where we essentially reshard the
> > elements and assign sequence numbers to elements with in each shard.
> > Duplicates in replays are avoided based on these sequence numbers. DoFn
> > state API is used to buffer out-of order replays. The implementation
> > strategy works in Dataflow but not in Flink which has a horizontal
> > checkpoint. KafkaIO checks for compatibility.
> >
> > On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau <
> > rmannibu...@gmail.com>
> > wrote:
> >
> > > Hi guys,
> > >
> > > The subject is a bit provocative but the topic is real and coming
> > > again and again with the beam usage: how a dofn can handle some
> > > "chunking".
> > >
> > > The need is to be able to commit each N records but with N not too big.
> > >
> > > The natural API for that in beam is the bundle one but bundles are not
> > > reliable since they can be very small (flink) - we can say it is "ok"
> > > even if it has some perf impacts - or too big (spark does full size /
> > > #workers).
> > >
> > > The workaround is what we see in the ES I/O: a maxSize which does an
> > > eager flush. The issue is that then the checkpoint is not respected
> > > and you can process multiple times the same records.
> > >
> > > Any plan to make this API reliable and controllable from a beam point
> > > of view (at least in a max manner)?
> > >
> > > Thanks,
> > > Romain Manni-Bucau
> > > @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> > >
> >
>


Re: makes bundle concept usable?

2017-11-16 Thread Raghu Angadi
Core issue here is that there is no explicit concept of 'checkpoint' in
Beam (UnboundedSource has a method 'getCheckpointMark' but that refers to
the checkoint on external source). Runners do checkpoint internally as
implementation detail. Flink's checkpoint model is entirely different from
Dataflow's and Spark's.

@StableReplay helps, but it does not explicitly talk about a checkpoint by
design.

If you are looking to achieve some guarantees with a sink/DoFn, I think it
is better to start with the requirements. I worked on exactly-once sink for
Kafka (see KafkaIO.write().withEOS()), where we essentially reshard the
elements and assign sequence numbers to elements with in each shard.
Duplicates in replays are avoided based on these sequence numbers. DoFn
state API is used to buffer out-of order replays. The implementation
strategy works in Dataflow but not in Flink which has a horizontal
checkpoint. KafkaIO checks for compatibility.

On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau 
wrote:

> Hi guys,
>
> The subject is a bit provocative but the topic is real and coming
> again and again with the beam usage: how a dofn can handle some
> "chunking".
>
> The need is to be able to commit each N records but with N not too big.
>
> The natural API for that in beam is the bundle one but bundles are not
> reliable since they can be very small (flink) - we can say it is "ok"
> even if it has some perf impacts - or too big (spark does full size /
> #workers).
>
> The workaround is what we see in the ES I/O: a maxSize which does an
> eager flush. The issue is that then the checkpoint is not respected
> and you can process multiple times the same records.
>
> Any plan to make this API reliable and controllable from a beam point
> of view (at least in a max manner)?
>
> Thanks,
> Romain Manni-Bucau
> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>


Re: Is upgrading to Kafka Client 0.10.2.0+ in the roadmap?

2017-10-30 Thread Raghu Angadi
>
> Thanks a lot for the information. I am using Beam-2.0.
> https://github.com/apache/beam/blob/release-2.0.0/sdks/
> java/io/kafka/pom.xml#L33


I think we should move kafka-clients dependency in KafkaIO to provided
scope to avoid potential confusion like this.

On Mon, Oct 30, 2017 at 11:10 AM, Shen Li  wrote:

> Dear All,
>
> Thanks a lot for the information. I am using Beam-2.0.
> https://github.com/apache/beam/blob/release-2.0.0/sdks/
> java/io/kafka/pom.xml#L33
>
> I have just verified that adding Kafka-Client 0.11 in the application
> pom.xml works fine for me. I can now avoid the JAAS configuration file by
> using the "java.security.auth.login.config" property.
>
> Best,
> Shen
>
> On Mon, Oct 30, 2017 at 1:41 PM, Mingmin Xu  wrote:
>
> > Hi Shen,
> >
> > Can you share which Beam version are you using? Just check master code,
> the
> > default version for Kafka is
> > `0.11.0.1`.
> > I cannot recall the usage for old versions, my
> application(2.2.0-SNAPSHOT)
> > works with a customized kafka version based on 0.10.00-SASL. What you
> need
> > to do is
> > 1). exclude the kafka-client in KafkaIO, and add your own Kafka client
> > library in pom.xml;
> > 2). add your configuration like:
> > ```
> > Map consumerPara = new HashMap();
> > //consumerPara.put(ConsumerConfig.GROUP_ID_CONFIG,
> consumerName);
> > //consumerPara.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
> > if (secureEnabled) {
> > consumerPara.put("sasl.mechanism", "IAF");
> > consumerPara.put("security.protocol", "SASL_PLAINTEXT");
> > consumerPara.put("sasl.login.class", ".");
> > consumerPara.put("sasl.callback.handler.class", "...");
> > }
> >
> >  KafkaIO.read()
> > 
> > .updateConsumerProperties(configUpdates)
> > ;
> >  ```
> >
> > Mingmin
> >
> > On Mon, Oct 30, 2017 at 10:21 AM, Raghu Angadi
>  > >
> > wrote:
> >
> > > >  https://issues.apache.org/jira/browse/BEAM-307
> > >
> > > This should be closed.
> > >
> > > On Mon, Oct 30, 2017 at 9:00 AM, Lukasz Cwik  >
> > > wrote:
> > >
> > > > There has been some discussion about getting Kafka 0.10.x working on
> > > > BEAM-307[1].
> > > >
> > > > As an immediate way to unblock yourself, modify your local copy of
> the
> > > > KafkaIO source to include setting the system property in a static
> block
> > > > before the class is loaded or before the Kafka client is instantiated
> > and
> > > > used.
> > > >
> > > > Also consider contributing to the Kafka connector to getting 0.10.x
> > > > working.
> > > >
> > > > 1: https://issues.apache.org/jira/browse/BEAM-307
> > > >
> > > > On Mon, Oct 30, 2017 at 8:14 AM, Shen Li 
> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > To use KafkaIO in secure mode, I need to set
> > > > > -Djava.security.auth.login.config to point to a JAAS configuration
> > > file.
> > > > > It
> > > > > works fine for local execution. But how can I configure the
> > > > > "java.security.auth.login.config" property in the Beam app when
> the
> > > > > pipeline is submitted to a cluster/cloud-service? Even if I use a
> > ParDo
> > > > to
> > > > > set the system property, there is no guarantee that the ParDo will
> > run
> > > on
> > > > > the same server with the KafkaIO source.
> > > > >
> > > > > For this specific problem, it would be helpful to upgrade to Kafka
> > > Client
> > > > > 0.10.2.0, which provides a "sasl.jaas.config" property that can be
> > > > updated
> > > > > programmatically. Or is there any other work around?
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 85%3A+Dynamic+JAAS+
> > > > > configuration+for+Kafka+clients
> > > > >
> > > > > Thanks,
> > > > > Shen
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > 
> > Mingmin
> >
>


Re: Is upgrading to Kafka Client 0.10.2.0+ in the roadmap?

2017-10-30 Thread Raghu Angadi
>  https://issues.apache.org/jira/browse/BEAM-307

This should be closed.

On Mon, Oct 30, 2017 at 9:00 AM, Lukasz Cwik 
wrote:

> There has been some discussion about getting Kafka 0.10.x working on
> BEAM-307[1].
>
> As an immediate way to unblock yourself, modify your local copy of the
> KafkaIO source to include setting the system property in a static block
> before the class is loaded or before the Kafka client is instantiated and
> used.
>
> Also consider contributing to the Kafka connector to getting 0.10.x
> working.
>
> 1: https://issues.apache.org/jira/browse/BEAM-307
>
> On Mon, Oct 30, 2017 at 8:14 AM, Shen Li  wrote:
>
> > Hi,
> >
> > To use KafkaIO in secure mode, I need to set
> > -Djava.security.auth.login.config to point to a JAAS configuration file.
> > It
> > works fine for local execution. But how can I configure the
> > "java.security.auth.login.config" property in the Beam app when the
> > pipeline is submitted to a cluster/cloud-service? Even if I use a ParDo
> to
> > set the system property, there is no guarantee that the ParDo will run on
> > the same server with the KafkaIO source.
> >
> > For this specific problem, it would be helpful to upgrade to Kafka Client
> > 0.10.2.0, which provides a "sasl.jaas.config" property that can be
> updated
> > programmatically. Or is there any other work around?
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 85%3A+Dynamic+JAAS+
> > configuration+for+Kafka+clients
> >
> > Thanks,
> > Shen
> >
>


Re: Is upgrading to Kafka Client 0.10.2.0+ in the roadmap?

2017-10-30 Thread Raghu Angadi
Shen,

KafkaIO works with all the versions since 0.9. Just include kafka-clients
version you like in your maven dependencies along with Beam dependencies.

Glad to here Kafka 0.10.2 made it simpler to provide this config.

On Mon, Oct 30, 2017 at 8:14 AM, Shen Li  wrote:

> Hi,
>
> To use KafkaIO in secure mode, I need to set
> -Djava.security.auth.login.config to point to a JAAS configuration file.
> It
> works fine for local execution. But how can I configure the
> "java.security.auth.login.config" property in the Beam app when the
> pipeline is submitted to a cluster/cloud-service? Even if I use a ParDo to
> set the system property, there is no guarantee that the ParDo will run on
> the same server with the KafkaIO source.
>
> For this specific problem, it would be helpful to upgrade to Kafka Client
> 0.10.2.0, which provides a "sasl.jaas.config" property that can be updated
> programmatically. Or is there any other work around?
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+
> configuration+for+Kafka+clients
>
> Thanks,
> Shen
>


Re: Java pre/post commit test suite breakage

2017-10-23 Thread Raghu Angadi
Regd (1) :

[4] did have have a file without Apache Licence. It was fixed the next day (
commit
),
thanks to Ken Knowles who pinged me about it.

On Mon, Oct 23, 2017 at 11:45 AM, Valentyn Tymofieiev 
wrote:

> Hi Beam-Dev,
>
> It's been >5 days since the last successful run of a
> beam_PreCommit_Java_MavenInstall build[1]  and >4 days since last
> successful run of beam_PreCommit_Java_MavenInstall[2].
>
> Looking at build logs I see following problems.
>
> 1. After October 17, postcommit builds started to fail with
>
> Failed to execute goal org.apache.rat:apache-rat-plugin:0.12:check
> (default) on project beam-parent: Too many files with unapproved license: 1
> See RAT report in: /home/jenkins/jenkins-slave/wo
> rkspace/beam_PostCommit_Java_MavenInstall/target/beam-paren
> t-2.3.0-SNAPSHOT.rat
>
> The earliest build that I see this error is Postcommit #5052 [3].
>
> This makes me suspect [4] or [5] as a breaking change, since they change
> pom files.
>
> Questions:
> - Is there a way we can reproduce this failure locally? mvn clean verify
> passes locally for me.
> - Is there a way we can see the See RAT report mentioned in the error
> log?
>
> 2. Prior to onset of #1 Java Precommit builds no longer complete within
> allotted 150 min time. Looking at [6-8] it seems the build makes consistent
> progress, but just does not finish on time. We can also see several recent
> successful builds with execution time very close to time out [9-11].
>
> I'd like to propose to increase time limit for Java precommit test suite
> from 2.5 to 4 hours. 4 hours is long time. I agree that we should
> definitely try to reduce the test execution time, and reduce flakiness.
> However we need the tests at least pass for now. If we write off failed
> test suites as 'flakes' and merge PRs without having a green test signal,
> we will have to spend more time tracing breakages such as #1.
>
> Thoughts?
>
> Thanks,
> Valentyn
>
> [1] https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/
> [2] https://builds.apache.org/job/beam_PostCommit_Java_MavenInstall/
> [3] https://builds.apache.org/job/beam_PostCommit_Java_MavenInst
> all/5052/changes
>
> [4] https://github.com/apache/beam/commit/d745cc9d8cc1735d3b
> c3c67ba3e2617cb7f11a8c
> [5] https://github.com/apache/beam/commit/0d8ab6cbbc762dd9f9be1b
> 3e9a26b6c9d0bb6dc3
>
> [6] https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/15222/
> [7] https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/15195/
> [8] https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/15220/
>
> [9] https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/15009/
> [10] https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/15068/
> [11] https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/15016/
>
>


Re: kafka docs

2017-08-29 Thread Raghu Angadi
Thanks Joey.
I couldn't find the PR. Do you have a link?


On Tue, Aug 29, 2017 at 1:48 AM, Joey Baruch  wrote:

> created pr on the beam site (just reference change for now, as Jean
> Baptiste wrote)
>
> Thanks Aviem!
>
> On Tue, Aug 29, 2017 at 11:46 AM Aviem Zur  wrote:
>
> > Hi Joey.
> >
> > This would be great. Also, KafkaIO requires a specific dependency to be
> > added (beam-sdks-java-io-kafka), we should probably put that as a maven
> > snippet in the README as well. Feel free to create a PR with this README
> on
> > GitHub.
> >
> > Regarding the long series of links you need to click in the site in order
> > to get to these docs, I agree, perhaps we can reduce this number of
> clicks,
> > feel free to create a PR on the beam-site repo on GitHub.
> >
> > Welcome to the community!
> >
> > On Tue, Aug 29, 2017 at 11:33 AM Jean-Baptiste Onofré 
> > wrote:
> >
> > > Agree to add the link to javadoc on the I/O list:
> > >
> > > https://beam.apache.org/documentation/io/built-in/
> > >
> > > Regards
> > > JB
> > >
> > > On 08/29/2017 10:28 AM, Joey Baruch wrote:
> > > > Hey all,
> > > >
> > > > As a new user trying to use a kafkaIO source/sink i couldn't find any
> > > > documentation easily.
> > > > The documentation page <
> > > https://beam.apache.org/documentation/io/built-in/>,
> > > > (which you get to from headder -> doccumentaton -> pipeline i/o ->
> > built
> > > in
> > > > i/o transforms) leads to the kafka class
> > > > , but
> > > there
> > > > is no docs there.
> > > >
> > > > I will add a simple readme that points to the classe's javadocs.
> > > >
> > > > regards
> > > > Joey Baruch
> > > >
> > >
> > > --
> > > Jean-Baptiste Onofré
> > > jbono...@apache.org
> > > http://blog.nanthrax.net
> > > Talend - http://www.talend.com
> > >
> >
>


Re: [PROPOSAL] "Requires deterministic input"

2017-08-10 Thread Raghu Angadi
Can we define what exactly is meant by deterministic/stable/replayable etc?

   - Does it imply a fixed order? If yes, it implies fixed order of
   processElement() invocations, right? Are there any qualifiers (within a
   window+key etc)?
   - Does it also imply fixed length and content for value iterators?
   - Some examples to clarify nuances would be very useful.

State durability semantics for timers that Reuven proposed seem to be
unrelated to stable input (at model level). It might be simpler to add
these semantics first. A lot of deterministic side-effects issues can be
handled by durable state in timers. One thing I like about timers approach
is that it makes the cost more transparent to the user since the state is
explicitly stored.


On Thu, Aug 10, 2017 at 10:02 AM, Ben Chambers  wrote:

> I think it only makes sense in places where a user might reasonable require
> stable input to ensure idempotency of side-effects. It also only makes
> sense in places where a runner could reasonably provide such a guarantee.
>
> A given Combine is unlikely to have side effects so it is less likely to
> benefit from stability of the input. Further, the reason writing a Combine
> is desirable is because its execution can be split up and moved to the
> mapper-side (before the GroupByKey). But this division is inherently
> non-deterministic, and so it seems unlikely to benefit from stability. And
> many cases where I could see wanting side-effects would end up in
> extractOutput, for which there is an easy (arguably better) solution --
> have extractOutput return the accumulators and do the side-effects in a
> DoFn afterwards.
>
> For composites, it is a bit trickier. I could see a case for supporting it
> on composites, but it would need to make it very clear that it only
> affected the input to the composite. If any of the operations within the
> composite were non-deterministic, then the outputs of that could be
> unstable, leading to instability in later parts of the composite. Further,
> it doesn't seem to offer much. The composite itself doesn't perform
> side-effects, so there is no benefit to having the annotation there --
> instead, we allow the annotation to be put where it is relevant and
> important -- on the DoFn's that actually have side-effects that require
> stability.
>
> On Thu, Aug 10, 2017 at 9:23 AM Reuven Lax 
> wrote:
>
> > I don't think it really makes sense to to do this on Combine. And I agree
> > with you, it doesn't make sense on composites either.
> >
> > On Thu, Aug 10, 2017 at 9:19 AM, Scott Wegner  >
> > wrote:
> >
> > > Does requires-stable-input only apply to ParDo transforms?
> > >
> > > I don't think it would make sense to annotate to composite, because
> > > checkpointing should happen as close to the side-effecting operation as
> > > possible, since upstream transforms within a composite could introduce
> > > non-determinism. So it's the primitive transform that should own the
> > > requirement.
> > >
> > > Are there other primitives that should be annotated? 'Combine' is
> > > interesting because it optimized in Dataflow (and perhaps other
> runners)
> > to
> > > partially apply before a GroupByKey.
> > >
> > > On Thu, Aug 10, 2017 at 9:01 AM Tyler Akidau
>  > >
> > > wrote:
> > >
> > > > +1 to the annotation idea, and to having it on processTimer.
> > > >
> > > > -Tyler
> > > >
> > > > On Thu, Aug 10, 2017 at 2:15 AM Aljoscha Krettek <
> aljos...@apache.org>
> > > > wrote:
> > > >
> > > > > +1 to the annotation approach. I outlined how implementing this
> would
> > > > work
> > > > > in the Flink runner in the Thread about the exactly-once Kafka
> Sink.
> > > > >
> > > > > > On 9. Aug 2017, at 23:03, Reuven Lax 
> > > wrote:
> > > > > >
> > > > > > Yes - I don't think we should try and make any deterministic
> > > guarantees
> > > > > > about what is in a bundle. Stability guarantees are per element
> > only.
> > > > > >
> > > > > > On Wed, Aug 9, 2017 at 1:30 PM, Thomas Groh
> >  > > >
> > > > > > wrote:
> > > > > >
> > > > > >> +1 to the annotation-on-ProcessElement approach. ProcessElement
> is
> > > the
> > > > > >> minimum implementation requirement of a DoFn, and should be
> where
> > > the
> > > > > >> processing logic which depends on characteristics of the inputs
> > lie.
> > > > > It's a
> > > > > >> good way of signalling the requirements of the Fn, and letting
> the
> > > > > runner
> > > > > >> decide.
> > > > > >>
> > > > > >> I have a minor concern that this may not work as expected for
> > users
> > > > that
> > > > > >> try to batch remote calls in `FinishBundle` - we should make
> sure
> > we
> > > > > >> document that it is explicitly the input elements that will be
> > > > replayed,
> > > > > >> and bundles and other operational are still arbitrary.
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> On Wed, Aug 9, 2017 at 10:37 AM, Reuven Lax
> > >  > > > >
> > > > > >> wrote:
> > > > > >>
> > > > > >>> I think deterministic here means deterministically

Re: Exactly-once Kafka sink

2017-08-10 Thread Raghu Angadi
On Thu, Aug 10, 2017 at 5:15 AM, Aljoscha Krettek 
wrote:

> Ah, also regarding your earlier mail: I didn't know if many people were
> using Kafka with Dataflow, thanks for that clarification! :-)
>
> Also, I don't think that the TwoPhaseCommit Sink of Flink would work in a
> Beam context, I was just posting that for reference.
>

Yep. It was pretty useful to understand Flink checkpoint interactions with
operators. Will look into more in future when we want to add exactly-once
support for KafkaIO with on Flink.


> Best,
> Aljoscha
> > On 10. Aug 2017, at 11:13, Aljoscha Krettek  wrote:
> >
> > @Raghu: Yes, exactly, that's what I thought about this morning,
> actually. These are the methods of an operator that are relevant to
> checkpointing:
> >
> > class FlinkOperator() {
> >  open();
> >  snapshotState():
> >  notifySnapshotComplete();
> >  initializeState();
> > }
> >
> > Input would be buffered in state, would be checkpointed in
> snapshotState() and processed when we receive a notification of a complete
> checkpoint (which is sent out once all operators have signaled that
> checkpointing is complete). In case of failure, we would be re-initialized
> with the buffered elements in initializeState() and could re-process them
> in open().
> >
> > This is somewhat expensive and leads to higher latency so we should only
> do it if the DoFn signals that it needs deterministic input.
> >
> > +Jingsong Who is working on something similar for the output produced in
> finishBundle().
> >
> >> On 9. Aug 2017, at 19:41, Raghu Angadi 
> wrote:
> >>
> >> Yep, an option to ensure replays see identical input would be pretty
> useful.
> >> It might be challenging on horizontally checkpointing runners like Flink
> >> (only way I see to buffer all the input in state and replay it after
> >> checkpoint).
> >>
> >> On Wed, Aug 9, 2017 at 10:21 AM, Reuven Lax 
> >> wrote:
> >>
> >>> Please see Kenn's proposal. This is a generic thing that is lacking in
> the
> >>> Beam model, and only works today for specific runners. We should fix
> this
> >>> at the Beam level, but I don't think that should block your PR.
> >>>
> >>>
> >>> On Wed, Aug 9, 2017 at 10:10 AM, Raghu Angadi
> 
> >>> wrote:
> >>>
> >>>> There are quite a few customers using KafkaIO with Dataflow. All of
> them
> >>>> are potential users of exactly-once sink. Dataflow Pubsub sink does
> not
> >>>> support EOS yet. Even among those customers, I do expect fraction of
> >>>> applications requiring EOS would be pretty small, that's why I don't
> >>> think
> >>>> extra shuffles are too expensive in overall cost yet.
> >>>>
> >>>> It is also not clear how Flink's 2-phase commit sink function could be
> >>> used
> >>>> in Beam's context. Beam could add some checkpoint semantics to
> state-API
> >>> so
> >>>> that all the runners could support in platform specific way.
> >>>>
> >>>> Took a look at Flink PR, commented on a few issues I see in comments
> >>> there
> >>>> : https://github.com/apache/flink/pull/4239. May be an extra shuffle
> or
> >>>> storing all them messages in state can get over those.
> >>>>
> >>>> On Wed, Aug 9, 2017 at 2:07 AM, Aljoscha Krettek  >
> >>>> wrote:
> >>>>
> >>>>> Yes, I think making this explicit would be good. Having a
> >>> transformation
> >>>>> that makes assumptions about how the runner implements certain things
> >>> is
> >>>>> not optimal. Also, I think that most people probably don't use Kafka
> >>> with
> >>>>> the Dataflow Runner (because GCE has Pubsub, but I'm guest guessing
> >>>> here).
> >>>>> This would mean that the intersection of "people who would benefit
> from
> >>>> an
> >>>>> exactly-once Kafka sink" and "people who use Beam on Dataflow" is
> >>> rather
> >>>>> small, and therefore not many people would benefit from such a
> >>> Transform.
> >>>>>
> >>>>> This is all just conjecture, of course.
> >>>>>
> >>>>> Best,
> >>>>> Aljoscha
> >>>>>
> >

Re: Exactly-once Kafka sink

2017-08-09 Thread Raghu Angadi
Yep, an option to ensure replays see identical input would be pretty useful.
It might be challenging on horizontally checkpointing runners like Flink
(only way I see to buffer all the input in state and replay it after
checkpoint).

On Wed, Aug 9, 2017 at 10:21 AM, Reuven Lax 
wrote:

> Please see Kenn's proposal. This is a generic thing that is lacking in the
> Beam model, and only works today for specific runners. We should fix this
> at the Beam level, but I don't think that should block your PR.
>
>
> On Wed, Aug 9, 2017 at 10:10 AM, Raghu Angadi 
> wrote:
>
> > There are quite a few customers using KafkaIO with Dataflow. All of them
> > are potential users of exactly-once sink. Dataflow Pubsub sink does not
> > support EOS yet. Even among those customers, I do expect fraction of
> > applications requiring EOS would be pretty small, that's why I don't
> think
> > extra shuffles are too expensive in overall cost yet.
> >
> > It is also not clear how Flink's 2-phase commit sink function could be
> used
> > in Beam's context. Beam could add some checkpoint semantics to state-API
> so
> > that all the runners could support in platform specific way.
> >
> > Took a look at Flink PR, commented on a few issues I see in comments
> there
> > : https://github.com/apache/flink/pull/4239. May be an extra shuffle or
> > storing all them messages in state can get over those.
> >
> > On Wed, Aug 9, 2017 at 2:07 AM, Aljoscha Krettek 
> > wrote:
> >
> > > Yes, I think making this explicit would be good. Having a
> transformation
> > > that makes assumptions about how the runner implements certain things
> is
> > > not optimal. Also, I think that most people probably don't use Kafka
> with
> > > the Dataflow Runner (because GCE has Pubsub, but I'm guest guessing
> > here).
> > > This would mean that the intersection of "people who would benefit from
> > an
> > > exactly-once Kafka sink" and "people who use Beam on Dataflow" is
> rather
> > > small, and therefore not many people would benefit from such a
> Transform.
> > >
> > > This is all just conjecture, of course.
> > >
> > > Best,
> > > Aljoscha
> > >
> > > > On 8. Aug 2017, at 23:34, Reuven Lax 
> wrote:
> > > >
> > > > I think the issue we're hitting is how to write this in Beam.
> > > >
> > > > Dataflow historically guaranteed checkpointing at every GBK (which
> due
> > to
> > > > the design of Dataflow's streaming shuffle was reasonably efficient).
> > In
> > > > Beam we never formalized these semantics, leaving these syncs in a
> gray
> > > > area. I believe the Spark runner currently checkpoints the RDD on
> every
> > > > GBK, so these unwritten semantics currently work for Dataflow and for
> > > Spark.
> > > >
> > > > We need someway to express this operation in Beam, whether it be via
> an
> > > > explicit Checkpoint() operation or via marking DoFns as having side
> > > > effects, and having the runner automatically insert such a Checkpoint
> > in
> > > > front of them. In Flink, this operation can be implemented using what
> > > > Aljoscha posted.
> > > >
> > > > Reuven
> > > >
> > > > On Tue, Aug 8, 2017 at 8:22 AM, Aljoscha Krettek <
> aljos...@apache.org>
> > > > wrote:
> > > >
> > > >> Hi,
> > > >>
> > > >> In Flink, there is a TwoPhaseCommit SinkFunction that can be used
> for
> > > such
> > > >> cases: [1]. The PR for a Kafka 0.11 exactly once producer builds on
> > > that:
> > > >> [2]
> > > >>
> > > >> Best,
> > > >> Aljoscha
> > > >>
> > > >> [1] https://github.com/apache/flink/blob/
> > 62e99918a45b7215c099fbcf160d45
> > > >> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
> > > >> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.
> > java#L55
> > > <
> > > >> https://github.com/apache/flink/blob/62e99918a45b7215c099fbcf160d45
> > > >> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
> > > >> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.
> > java#L55>
> > > >> [2] https://github.com/apache/flink/pull/4239
> > > >>> On 3. Aug 2017, 

Re: Exactly-once Kafka sink

2017-08-09 Thread Raghu Angadi
There are quite a few customers using KafkaIO with Dataflow. All of them
are potential users of exactly-once sink. Dataflow Pubsub sink does not
support EOS yet. Even among those customers, I do expect fraction of
applications requiring EOS would be pretty small, that's why I don't think
extra shuffles are too expensive in overall cost yet.

It is also not clear how Flink's 2-phase commit sink function could be used
in Beam's context. Beam could add some checkpoint semantics to state-API so
that all the runners could support in platform specific way.

Took a look at Flink PR, commented on a few issues I see in comments there
: https://github.com/apache/flink/pull/4239. May be an extra shuffle or
storing all them messages in state can get over those.

On Wed, Aug 9, 2017 at 2:07 AM, Aljoscha Krettek 
wrote:

> Yes, I think making this explicit would be good. Having a transformation
> that makes assumptions about how the runner implements certain things is
> not optimal. Also, I think that most people probably don't use Kafka with
> the Dataflow Runner (because GCE has Pubsub, but I'm guest guessing here).
> This would mean that the intersection of "people who would benefit from an
> exactly-once Kafka sink" and "people who use Beam on Dataflow" is rather
> small, and therefore not many people would benefit from such a Transform.
>
> This is all just conjecture, of course.
>
> Best,
> Aljoscha
>
> > On 8. Aug 2017, at 23:34, Reuven Lax  wrote:
> >
> > I think the issue we're hitting is how to write this in Beam.
> >
> > Dataflow historically guaranteed checkpointing at every GBK (which due to
> > the design of Dataflow's streaming shuffle was reasonably efficient). In
> > Beam we never formalized these semantics, leaving these syncs in a gray
> > area. I believe the Spark runner currently checkpoints the RDD on every
> > GBK, so these unwritten semantics currently work for Dataflow and for
> Spark.
> >
> > We need someway to express this operation in Beam, whether it be via an
> > explicit Checkpoint() operation or via marking DoFns as having side
> > effects, and having the runner automatically insert such a Checkpoint in
> > front of them. In Flink, this operation can be implemented using what
> > Aljoscha posted.
> >
> > Reuven
> >
> > On Tue, Aug 8, 2017 at 8:22 AM, Aljoscha Krettek 
> > wrote:
> >
> >> Hi,
> >>
> >> In Flink, there is a TwoPhaseCommit SinkFunction that can be used for
> such
> >> cases: [1]. The PR for a Kafka 0.11 exactly once producer builds on
> that:
> >> [2]
> >>
> >> Best,
> >> Aljoscha
> >>
> >> [1] https://github.com/apache/flink/blob/62e99918a45b7215c099fbcf160d45
> >> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
> >> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java#L55
> <
> >> https://github.com/apache/flink/blob/62e99918a45b7215c099fbcf160d45
> >> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
> >> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java#L55>
> >> [2] https://github.com/apache/flink/pull/4239
> >>> On 3. Aug 2017, at 04:03, Raghu Angadi 
> >> wrote:
> >>>
> >>> Kafka 0.11 added support for transactions[1], which allows end-to-end
> >>> exactly-once semantics. Beam's KafkaIO users can benefit from these
> while
> >>> using runners that support exactly-once processing.
> >>>
> >>> I have an implementation of EOS support for Kafka sink :
> >>> https://github.com/apache/beam/pull/3612
> >>> It has two shuffles and builds on Beam state-API and checkpoint barrier
> >>> between stages (as in Dataflow). Pull request has a longer description.
> >>>
> >>> - What other runners in addition to Dataflow would be compatible with
> >> such
> >>> a strategy?
> >>> - I think it does not quite work for Flink (as it has a global
> >> checkpoint,
> >>> not between the stages). How would one go about implementing such a
> sink.
> >>>
> >>> Any comments on the pull request are also welcome.
> >>>
> >>> Thanks,
> >>> Raghu.
> >>>
> >>> [1]
> >>> https://www.confluent.io/blog/exactly-once-semantics-are-
> >> possible-heres-how-apache-kafka-does-it/
> >>
> >>
>
>


Re: Exactly-once Kafka sink

2017-08-08 Thread Raghu Angadi
That's awesome! I will look into it more this weekend.

On Tue, Aug 8, 2017 at 8:22 AM, Aljoscha Krettek 
wrote:

> Hi,
>
> In Flink, there is a TwoPhaseCommit SinkFunction that can be used for such
> cases: [1]. The PR for a Kafka 0.11 exactly once producer builds on that:
> [2]
>
> Best,
> Aljoscha
>
> [1] https://github.com/apache/flink/blob/62e99918a45b7215c099fbcf160d45
> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java#L55 <
> https://github.com/apache/flink/blob/62e99918a45b7215c099fbcf160d45
> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java#L55>
> [2] https://github.com/apache/flink/pull/4239
> > On 3. Aug 2017, at 04:03, Raghu Angadi 
> wrote:
> >
> > Kafka 0.11 added support for transactions[1], which allows end-to-end
> > exactly-once semantics. Beam's KafkaIO users can benefit from these while
> > using runners that support exactly-once processing.
> >
> > I have an implementation of EOS support for Kafka sink :
> > https://github.com/apache/beam/pull/3612
> > It has two shuffles and builds on Beam state-API and checkpoint barrier
> > between stages (as in Dataflow). Pull request has a longer description.
> >
> > - What other runners in addition to Dataflow would be compatible with
> such
> > a strategy?
> > - I think it does not quite work for Flink (as it has a global
> checkpoint,
> > not between the stages). How would one go about implementing such a sink.
> >
> > Any comments on the pull request are also welcome.
> >
> > Thanks,
> > Raghu.
> >
> > [1]
> > https://www.confluent.io/blog/exactly-once-semantics-are-
> possible-heres-how-apache-kafka-does-it/
>
>


Exactly-once Kafka sink

2017-08-02 Thread Raghu Angadi
Kafka 0.11 added support for transactions[1], which allows end-to-end
exactly-once semantics. Beam's KafkaIO users can benefit from these while
using runners that support exactly-once processing.

I have an implementation of EOS support for Kafka sink :
https://github.com/apache/beam/pull/3612
It has two shuffles and builds on Beam state-API and checkpoint barrier
between stages (as in Dataflow). Pull request has a longer description.

- What other runners in addition to Dataflow would be compatible with such
a strategy?
- I think it does not quite work for Flink (as it has a global checkpoint,
not between the stages). How would one go about implementing such a sink.

Any comments on the pull request are also welcome.

Thanks,
Raghu.

[1]
https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/


Re: Style of messages for checkArgument/checkNotNull in IOs

2017-07-28 Thread Raghu Angadi
On Fri, Jul 28, 2017 at 11:21 AM, Thomas Groh 
wrote:

> I'm in favor of the wording in the style of the first: it's an immediately
> actionable message that will have an associated stack trace, but will
> provide the parameter in plaintext so the caller doesn't have to dig
> through the invoked code, they can just look at the documentation.
>
> I've recently been convinced that all input validation should go through
> `checkArgument` (including for nulls) rather than 'checkNotNull', due to
> the type of exception thrown, so I'd usually prefer using that as the
> `Preconditions` method. Beyond that, +1
>

+1. For me, main requirement is that it should be aimed at end user and
should be actionable without having to look at the code. It is always a
delight to receive message useful error as early as possible. This also
applies when checks are made in validate().


>
> On Fri, Jul 28, 2017 at 11:17 AM, Eugene Kirpichov <
> kirpic...@google.com.invalid> wrote:
>
> > Hey all,
> >
> > I think this has been discussed before on a JIRA issue but I can't find
> it,
> > so raising again on the mailing list.
> >
> > Various IO (and non-IO) transforms validate their builder parameters
> using
> > Preconditions.checkArgument/checkNotNull, and use different styles for
> > error messages. There are 2 major styles:
> >
> > 1) style such as:
> > checkNotNull(username, "username"), or checkArgument(username != null,
> > "username can not be null") or checkArgument(username != null,
> > "username must be set");
> > checkArgument(batchSize > 0, "batchSize must be non-negative, but was:
> %s",
> > batchSize)
> >
> > 2) style such as:
> >   checkArgument(
> >username != null,
> >"ConnectionConfiguration.create().withBasicCredentials(
> > username,
> > password) "
> >+ "called with null username");
> >checkArgument(
> >!username.isEmpty(),
> >"ConnectionConfiguration.create().withBasicCredentials(
> > username,
> > password) "
> >+ "called with empty username");
> >
> > Style 2 is recommended by the PTransform Style Guide
> > https://beam.apache.org/contribute/ptransform-style-guide/#transform-
> > configuration-errors
> >
> > However:
> > 1) The usage of these two styles is not consistent - both are used in
> about
> > the same amounts in Beam IOs.
> > 2) Style 2 seems unnecessarily verbose to me. The exception thrown from a
> > checkArgument or checkNotNull already includes the method being called
> into
> > the stack trace, so I don't think the message needs to include the
> method.
> > 3) Beam is not the first Java project to have validation of configuration
> > parameters of something or another, and I don't think I've seen something
> > as verbose as style 2 used anywhere else in my experience of writing
> Java.
> >
> > What do people think about changing the guidance in favor of style 1?
> >
> > Specifically change the following example:
> >
> > public Twiddle withMoo(int moo) {
> >   checkArgument(moo >= 0 && moo < 100,
> >   "Thumbs.Twiddle.withMoo() called with an invalid moo of %s. "
> >   + "Valid values are 0 (inclusive) to 100 (exclusive)",
> >   moo);
> >   return toBuilder().setMoo(moo).build();}
> >
> > into the following:
> >
> > public Twiddle withMoo(int moo) {
> >   checkArgument(moo >= 0 && moo < 100,
> >   "Valid values for moo are 0 (inclusive) to 100 (exclusive), "
> >   + "but was: %s",
> >   moo);
> >   return toBuilder().setMoo(moo).build();}
> >
> >
> > And in simpler cases such as non-null checks:
> > public Twiddle withUsername(String username) {
> >   checkNotNull(username, "username");
> >   checkArgument(!username.isEmpty(), "username can not be empty");
> >   ...
> > }
> >
>


Re: [DISCUSS] Apache Beam 2.1.0 release next week ?

2017-07-05 Thread Raghu Angadi
I would like to request merging two Kafka related PRs : #3461
, #3492
. Especially the second one, as
it improves user experience in case of server misconfiguration that
prevents connections between workers and the Kafka cluster.

On Wed, Jul 5, 2017 at 8:10 AM, Jean-Baptiste Onofré 
wrote:

> FYI, the release branch has been created.
>
> I plan to do the RC1 tomorrow, so you have time to cherry-pick if wanted ;)
>
> Regards
> JB
>
>
> On 07/05/2017 07:52 AM, Jean-Baptiste Onofré wrote:
>
>> Hi,
>>
>> I'm building with the last changes and I will cut the release branch just
>> after.
>>
>> I keep you posted.
>>
>> Regards
>> JB
>>
>> On 07/03/2017 05:37 PM, Jean-Baptiste Onofré wrote:
>>
>>> Hi guys,
>>>
>>> The 2.1.0 release branch will be great in a hour or so.
>>>
>>> I updated Jira, please, take a look and review the one assigned to you
>>> where I left a comment.
>>>
>>> Thanks !
>>> Regards
>>> JB
>>>
>>> On 07/01/2017 07:06 AM, Jean-Baptiste Onofré wrote:
>>>
 It sounds good Kenn. Thanks.

 I will ask in the Jira.

 Thanks !
 Regards
 JB

 On 07/01/2017 06:58 AM, Kenneth Knowles wrote:

> SGTM
>
> There are still 23 open issues tagged with 2.1.0. Since this is not
> reduced
> from last time, I think it is fair to ask them to be cherry-picked to
> the
> release branch or deferred.
>
> To the assignees of these issues: can you please evaluate whether
> completion is imminent?
>
> I want to also note that many PMC members have Monday and Tuesday off,
> providing a strong incentive to take the whole week off. So I suggest
> July
> 10 as the earliest day for RC1.
>
> On Fri, Jun 30, 2017 at 8:53 PM, Jean-Baptiste Onofré  >
> wrote:
>
> Hi,
>>
>> The build is now back to normal, I will create the release branch
>> today.
>>
>> Regards
>> JB
>>
>>
>> On 06/29/2017 03:22 PM, Jean-Baptiste Onofré wrote:
>>
>> FYI,
>>>
>>> I opened https://github.com/apache/beam/pull/3471 to fix the
>>> SpannerIO
>>> test on my machine. I don't understand how the test can pass without
>>> defining the project ID (it should always fail on the precondition).
>>>
>>> I will create the release branch once this PR is merged.
>>>
>>> Regards
>>> JB
>>>
>>> On 06/29/2017 06:29 AM, Jean-Baptiste Onofré wrote:
>>>
>>> Hi Stephen,

 Thanks for the update.

 I have an issue on my machine with SpannerIOTest. I will create the
 release branch as soon as this is fix. Then, we will be able to
 cherry-pick
 the fix we want.

 I keep you posted.

 Regards
 JB

 On 06/28/2017 09:37 PM, Stephen Sisk wrote:

 hi!
>
> I'm hopeful we can get the fix for BEAM-2533 into this release as
> well,
> there's a bigtable fix in the next version that'd be good to have.
> The
> bigtable client release should be in the next day or two.
>
> S
>
> On Mon, Jun 26, 2017 at 12:03 PM Jean-Baptiste Onofré <
> j...@nanthrax.net>
> wrote:
>
> Hi guys,
>
>>
>> just a quick update about the 2.1.0 release.
>>
>> I will complete the Jira triage tomorrow.
>>
>> I plan to create the release branch Wednesday.
>>
>> Thanks !
>> Regards
>> JB
>>
>> On 06/22/2017 04:23 AM, Jean-Baptiste Onofré wrote:
>>
>> Hi guys,
>>>
>>> As we released 2.0.0 (first stable release) last month during
>>> ApacheCon,
>>>
>>> and to
>>
>> maintain our release pace, I would like to release 2.1.0 next
>>> week.
>>>
>>> This release would include lot of bug fixes and some new
>>> features:
>>>
>>> https://issues.apache.org/jira/projects/BEAM/versions/12340528
>>>
>>> I'm volunteer to be release manager for this one.
>>>
>>> Thoughts ?
>>>
>>> Thanks,
>>> Regards
>>> JB
>>>
>>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>>
>>
>

>>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>>
>

>>>
>>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: KafkaIO, Warning on offset gap

2017-06-28 Thread Raghu Angadi
Fixing it in https://github.com/apache/beam/pull/3461.

Thanks for reporting the issue.

On Wed, Jun 28, 2017 at 8:37 AM, Raghu Angadi  wrote:

> Hi Elmar,
>
> You are right. We should not log this at all when the gaps are expected as
> you pointed out. I don't think client can check if compaction is enabled
> for a topic through Consumer api.
>
> I think we should remove the log. The user can't really act on it other
> than reporting it. I will send a PR.
>
> As a temporary work around you can disable logging for a particular class
> on the worker with --workerLogLevelOverrides
> <https://cloud.google.com/dataflow/pipelines/logging> option. But this
> this would suppress rest of the logging the reader.
>
> Raghu
>
>
> On Wed, Jun 28, 2017 at 4:12 AM, Elmar Weber  wrote:
>
>> Hello,
>>
>> I'm testing the KafkaIO with Google Cloud dataflow and getting warnings
>> when working with compacted logs. In the code there is a relevant check:
>>
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafk
>> a/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1158
>>
>> // sanity check
>> if (offset != expected) {
>>   LOG.warn("{}: gap in offsets for {} at {}. {} records missing.",
>>   this, pState.topicPartition, expected, offset - expected);
>> }
>>
>> From what I understand, this can happen when log compaction is enabled
>> because the relevant entry can get cleaned up by Kafka with a newer one.
>> In this case, shouldn't this be a info log and / or warn only when log
>> compaction is disabled for the topic?
>>
>>
>> I'm still debugging some stuff because the pipeline also stops reading on
>> compacted logs, I'm not sure if this related / could also be an issue with
>> my Kafka test installation, but as far as I understand the gaps are
>> expected behaviour with log compaction enabled.
>>
>> Thanks,
>> Elmar
>>
>>
>


Re: KafkaIO, Warning on offset gap

2017-06-28 Thread Raghu Angadi
Hi Elmar,

You are right. We should not log this at all when the gaps are expected as
you pointed out. I don't think client can check if compaction is enabled
for a topic through Consumer api.

I think we should remove the log. The user can't really act on it other
than reporting it. I will send a PR.

As a temporary work around you can disable logging for a particular class
on the worker with --workerLogLevelOverrides
 option. But this this
would suppress rest of the logging the reader.

Raghu


On Wed, Jun 28, 2017 at 4:12 AM, Elmar Weber  wrote:

> Hello,
>
> I'm testing the KafkaIO with Google Cloud dataflow and getting warnings
> when working with compacted logs. In the code there is a relevant check:
>
>
> https://github.com/apache/beam/blob/master/sdks/java/io/kafk
> a/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1158
>
> // sanity check
> if (offset != expected) {
>   LOG.warn("{}: gap in offsets for {} at {}. {} records missing.",
>   this, pState.topicPartition, expected, offset - expected);
> }
>
> From what I understand, this can happen when log compaction is enabled
> because the relevant entry can get cleaned up by Kafka with a newer one.
> In this case, shouldn't this be a info log and / or warn only when log
> compaction is disabled for the topic?
>
>
> I'm still debugging some stuff because the pipeline also stops reading on
> compacted logs, I'm not sure if this related / could also be an issue with
> my Kafka test installation, but as far as I understand the gaps are
> expected behaviour with log compaction enabled.
>
> Thanks,
> Elmar
>
>


Re: low availability in the coming 4 weeks

2017-05-25 Thread Raghu Angadi
Congrats Mingmin. All the best!

On Wed, May 24, 2017 at 8:33 PM, Mingmin Xu  wrote:

> Hello everyone,
>
> I'll take 4 weeks off to take care of my new born baby. I'm very glad that
> James Xu agrees to take my role in Beam SQL feature.
>
> Ps, I'll consolidate the PR for BEAM-2010 soon before that.
>
> Thank you!
> 
> Mingmin
>


Re: Kafka Offset handling for Restart/failure scenarios.

2017-03-21 Thread Raghu Angadi
Expanding a bit more on what Dan wrote:

   - In Dataflow, there are two modes of restarting a job : regular stop
   and then start & an *update*. The checkpoint is carried over only in the
   case of update.
   - Update is the only to keep 'exactly-once' semantics.
   - If the requirements are not very strict, you can enable offset commits
   in Kafka itself. KafkaIO lets you configure this. Here the pipeline would
   start reading from approximately where it left off in the previous run.
  - When a offset commits are enabled, KafkaIO could this by
  implementing 'finalize()' API on KafkaCheckpointMark [1].
  - This is runner independent.
  - The compromise is that this might skip a few records or read a few
  old records when the pipeline is restarted.
  - This does not override 'resume from checkpoint' support when runner
  provides KafkaCheckpointMark. Externally committed offsets are used only
  when KafkaIO's own CheckpointMark is not available.

[1]:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java#L50

On Tue, Mar 21, 2017 at 5:28 PM, Dan Halperin  wrote:

> [We should keep user list involved if that's where the discussion
> originally was :)]
>
> Jins George's original question was a good one. The right way to resume
> from the previous offset here is what we're already doing – use the
> KafkaCheckpointMark. In Beam, the runner maintains the state and not the
> external system. Beam runners are responsible for maintaining the
> checkpoint marks, and for redoing all uncommitted (uncheckpointed) work. If
> a user disables checkpointing, then they are explicitly opting into "redo
> all work" on restart.
>
> --> If checkpointing is enabled but the KafkaCheckpointMark is not being
> provided, then I'm inclined to agree with Amit that there may simply be a
> bug in the FlinkRunner. (+aljoscha)
>
> For what Mingmin Xu asked about: presumably if the Kafka source is
> initially configured to "read from latest offset", when it restarts with no
> checkpoint this will automatically go find the latest offset. That would
> mimic at-most-once semantics in a buggy runner that did not provide
> checkpointing.
>
> Dan
>
> On Tue, Mar 21, 2017 at 2:59 PM, Mingmin Xu  wrote:
>
>> In SparkRunner, the default checkpoint storage is TmpCheckpointDirFactory.
>> Can it restore during job restart? --Not test the runner in streaming for
>> some time.
>>
>> Regarding to data-completeness, I would use at-most-once when few data
>> missing(mostly tasknode failure) is tolerated, compared to the performance
>> cost introduced by 'state'/'checkpoint'.
>>
>> On Tue, Mar 21, 2017 at 1:36 PM, Amit Sela  wrote:
>>
>> > On Tue, Mar 21, 2017 at 7:26 PM Mingmin Xu  wrote:
>> >
>> > > Move discuss to dev-list
>> > >
>> > > Savepoint in Flink, also checkpoint in Spark, should be good enough to
>> > > handle this case.
>> > >
>> > > When people don't enable these features, for example only need
>> > at-most-once
>> > >
>> > The Spark runner forces checkpointing on any streaming (Beam)
>> application,
>> > mostly because it uses mapWithState for reading from UnboundedSource and
>> > updateStateByKey form GroupByKey - so by design, Spark runner is
>> > at-least-once. Generally, I always thought that applications that
>> require
>> > at-most-once are more focused on processing time only, as they only care
>> > about whatever get's ingested into the pipeline at a specific time and
>> > don't care (up to the point of losing data) about correctness.
>> > I would be happy to hear more about your use case.
>> >
>> > > semantic, each unbounded IO should try its best to restore from last
>> > > offset, although CheckpointMark is null. Any ideas?
>> > >
>> > > Mingmin
>> > >
>> > > On Tue, Mar 21, 2017 at 9:39 AM, Dan Halperin 
>> > wrote:
>> > >
>> > > > hey,
>> > > >
>> > > > The native Beam UnboundedSource API supports resuming from
>> checkpoint
>> > --
>> > > > that specifically happens here
>> > > > <
>> > > https://github.com/apache/beam/blob/master/sdks/java/io/kafk
>> > a/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L674>
>> > > when
>> > > > the KafkaCheckpointMark is non-null.
>> > > >
>> > > > The FlinkRunner should be providing the KafkaCheckpointMark from the
>> > most
>> > > > recent savepoint upon restore.
>> > > >
>> > > > There shouldn't be any "special" Flink runner support needed, nor is
>> > the
>> > > > State API involved.
>> > > >
>> > > > Dan
>> > > >
>> > > > On Tue, Mar 21, 2017 at 9:01 AM, Jean-Baptiste Onofré <
>> j...@nanthrax.net
>> > >
>> > > > wrote:
>> > > >
>> > > >> Would not it be Flink runner specific ?
>> > > >>
>> > > >> Maybe the State API could do the same in a runner agnostic way
>> (just
>> > > >> thinking loud) ?
>> > > >>
>> > > >> Regards
>> > > >> JB
>> > > >>
>> > > >> On 03/21/2017 04:56 PM, Mingmin Xu wrote:
>> > > >>
>> > > >>> From KafkaIO itself, looks like it either start_from

Re: We've hit 2000 PRs!

2017-02-16 Thread Raghu Angadi
Amazing to see this Dan.

Congrats and thanks to the committers. You have been so welcoming and
promptly reviewed so may patches.

I know from my own personal experience how much dedication and time it
demands.

thank you!
Raghu.

On Thu, Feb 16, 2017 at 8:05 AM, Dan Halperin 
wrote:

> Checking my previous claims:
>
> PR #1: Feb 26, 2016
> PR #1000: Sep 24, 2016 (211 days later)
> PR #2000: Feb 13, 2016 (142 days later) Yep -- much quicker!
>
> I'm excited to see this community growing and innovating as we march
> towards the true Beam Technical Vision, a first major release, and really
> empowering users to build portable, long-lived, fast data processing
> pipelines.
>
> Thanks everyone for making this community and keeping this project really
> fun :)
>
> Dan
>
> On Mon, Sep 26, 2016 at 2:47 PM, Dan Halperin  wrote:
>
> > Hey folks!
> >
> > Just wanted to send out a note -- we've hit 1000 PRs in GitHub as of
> > Saturday! That's a tremendous amount of work for the 7 months since PR#1.
> >
> > I bet we hit 2000 in much fewer than 7 months ;)
> >
> > Dan
> >
>


Re: Merge HadoopInputFormatIO and HDFSIO in a single module

2017-02-16 Thread Raghu Angadi
FileInputFormat is extremely widely used, pretty much all the file based
input formats extend it. All of them call into to list the input files,
split (with some tweaks on top of that). The special API (
*FileInputFormat.setMinInputSplitSize(job,
desiredBundleSizeBytes)* ) is how the split size is normally communicated.
New IO can use the api directly.

HdfsIO as implemented in Beam is not HDFS specific at all. There are no
hdfs imports and HDFS name does not appear anywhere other than in HdfsIO's
own class and method names. AvroHdfsFileSource etc would work just as well
with new IO.

On Thu, Feb 16, 2017 at 8:17 AM, Dan Halperin 
wrote:

> (And I think renaming to HadoopIO doesn't make sense. "InputFormat" is the
> key component of the name -- it reads things that implement the InputFormat
> interface. "Hadoop" means a lot more than that.)
>

Often 'IO' in Beam implies both sources and sinks. It might not be long
before we might be supporting Hadoop OutputFormat as well. In addition
HadoopInputFormatIO is quite a mouthful. Agreed, Hadoop can mean a lot of
things depending on the context. In 'IO' context it might not be too broad.
Normally it implies 'any FileSystem supported in Hadoop, e.g. S3'.

Either way, I am quite confident once HadoopInputFormatIO is written, it
can easily replace HdfsIO. That decision could be made later.

Raghu.


Re: Merge HadoopInputFormatIO and HDFSIO in a single module

2017-02-15 Thread Raghu Angadi
Dipti,

Also how about calling it just HadoopIO?

On Wed, Feb 15, 2017 at 11:13 AM, Raghu Angadi  wrote:

> I skimmed through HdfsIO and I think it is essentially HahdoopInpuFormatIO
> with FileInputFormat. I would pretty much move most of the code to
> HadoopInputFormatIO (just make HdfsIO a specific instance of HIF_IO).
>
> On Wed, Feb 15, 2017 at 9:15 AM, Dipti Kulkarni <
> dipti_dkulka...@persistent.com> wrote:
>
>> Hello there!
>> I am working on writing a Read IO for Hadoop InputFormat. This will
>> enable reading from any datasource which supports Hadoop InputFormat, i.e.
>> provides source to read from InputFormat for integration with Hadoop.
>> It makes sense for the HadoopInputFormatIO to share some code with the
>> HdfsIO - WritableCoder in particular, but also some helper classes like
>> SerializableSplit etc. I was wondering if we could move HDFS and
>> HadoopInputFormat into a shared module for Hadoop IO in general instead of
>> maintaining them separately.
>> Do let me know on what you think, please let me know if you can think of
>> any other ideas too.
>>
>> Thanks,
>> Dipti
>>
>>
>> DISCLAIMER
>> ==
>> This e-mail may contain privileged and confidential information which is
>> the property of Persistent Systems Ltd. It is intended only for the use of
>> the individual or entity to which it is addressed. If you are not the
>> intended recipient, you are not authorized to read, retain, copy, print,
>> distribute or use this message. If you have received this communication in
>> error, please notify the sender and delete all copies of this message.
>> Persistent Systems Ltd. does not accept any liability for virus infected
>> mails.
>>
>>
>


Re: Merge HadoopInputFormatIO and HDFSIO in a single module

2017-02-15 Thread Raghu Angadi
I skimmed through HdfsIO and I think it is essentially HahdoopInpuFormatIO
with FileInputFormat. I would pretty much move most of the code to
HadoopInputFormatIO (just make HdfsIO a specific instance of HIF_IO).

On Wed, Feb 15, 2017 at 9:15 AM, Dipti Kulkarni <
dipti_dkulka...@persistent.com> wrote:

> Hello there!
> I am working on writing a Read IO for Hadoop InputFormat. This will enable
> reading from any datasource which supports Hadoop InputFormat, i.e.
> provides source to read from InputFormat for integration with Hadoop.
> It makes sense for the HadoopInputFormatIO to share some code with the
> HdfsIO - WritableCoder in particular, but also some helper classes like
> SerializableSplit etc. I was wondering if we could move HDFS and
> HadoopInputFormat into a shared module for Hadoop IO in general instead of
> maintaining them separately.
> Do let me know on what you think, please let me know if you can think of
> any other ideas too.
>
> Thanks,
> Dipti
>
>
> DISCLAIMER
> ==
> This e-mail may contain privileged and confidential information which is
> the property of Persistent Systems Ltd. It is intended only for the use of
> the individual or entity to which it is addressed. If you are not the
> intended recipient, you are not authorized to read, retain, copy, print,
> distribute or use this message. If you have received this communication in
> error, please notify the sender and delete all copies of this message.
> Persistent Systems Ltd. does not accept any liability for virus infected
> mails.
>
>


Re: Metrics for Beam IOs.

2017-02-14 Thread Raghu Angadi
On Tue, Feb 14, 2017 at 9:21 AM, Ben Chambers 
wrote:

>
> > * I also think there are data source specific metrics that a given IO
> will
> > want to expose (ie, things like kafka backlog for a topic.)


UnboundedSource has API for backlog. It is better for beam/runners to
handle backlog as well.
Of course there will be some source specific metrics too (errors, i/o ops
etc).


Re: BEAM-307(KafkaIO on Kafka 0.10)

2017-02-08 Thread Raghu Angadi
On Wed, Feb 8, 2017 at 12:19 PM, Jesse Anderson 
wrote:

> I'm not. There was a decent amount of time between the first 0.8 and 0.9
> release.
>

The ones that affect are minor changes between 0.9 and 0.10 (e.g. change
vararg to Collection<>). May be both could have existed with older one
marked deprecated?


Re: BEAM-307(KafkaIO on Kafka 0.10)

2017-02-08 Thread Raghu Angadi
True.

I was commenting on Kafka developers. I am surprised the api breakages
didn't have any deprecation period at all.

On Wed, Feb 8, 2017 at 12:02 PM, Xu Mingmin  wrote:

> i tend to have more versions supported, actually in our prod environment,
> there're 0.8, 0.9  and 0.10 for different teams. we'd take care of users
> who are on old versions.
>
>
> On Wed, Feb 8, 2017 at 10:56 AM, Raghu Angadi 
> wrote:
>
> > If we let the user pick their kafka version in their dependencies,
> simplest
> > fix is to broaden KafkaIO kafka-client dependency to something like
> [0.9.1,
> > 0.11) (and handle the api incompatibility at runtime).
> >
> > It might not be long before we could drop 0.9 support. Looking at these
> api
> > changes in Kafka client api without any deprecation warnings, I think
> Kafka
> > does not expect older versions to linger much longer either.
> >
> > On Wed, Feb 8, 2017 at 10:31 AM, Raghu Angadi 
> wrote:
> >
> > > What is the recommended way for users to bundle their app? The fix
> could
> > > as simple as letting the user set version in mvn property
> > > ('kafka.client.version').
> >
>


Re: BEAM-307(KafkaIO on Kafka 0.10)

2017-02-08 Thread Raghu Angadi
If we let the user pick their kafka version in their dependencies, simplest
fix is to broaden KafkaIO kafka-client dependency to something like [0.9.1,
0.11) (and handle the api incompatibility at runtime).

It might not be long before we could drop 0.9 support. Looking at these api
changes in Kafka client api without any deprecation warnings, I think Kafka
does not expect older versions to linger much longer either.

On Wed, Feb 8, 2017 at 10:31 AM, Raghu Angadi  wrote:

> What is the recommended way for users to bundle their app? The fix could
> as simple as letting the user set version in mvn property
> ('kafka.client.version').


Re: BEAM-307(KafkaIO on Kafka 0.10)

2017-02-08 Thread Raghu Angadi
What is the recommended way for users to bundle their app? The fix could as
simple as letting the user set version in mvn property
('kafka.client.version'). The api incompatibility blocking Xu is small
enough (just seekToEnd() call) that it can be handled at runtime with
reflection. It is called only once every 5 seconds.


On Wed, Feb 8, 2017 at 10:09 AM, Stephen Sisk 
wrote:

> hi JB!
>
> Can you explain what you mean by easier to maintain? Is that because of
> maven dependency management, or some other factor? If we need to go with 2
> modules for dependency reasons, is there a way to share code between those
> two modules so that we don't have 2 copies of the code? (or is the copied
> code desirable b/c of very different code needs?)
>
> S
>
> On Mon, Feb 6, 2017 at 2:44 PM Jean-Baptiste Onofré 
> wrote:
>
> > Go ahead !
> >
> > My suggestion was to create one IO module per version (kafka-0.9,
> > kafka-0.10 for instance).
> >
> > I think it's easier to maintain.
> >
> > Regards
> > JB
> >
> > On Feb 6, 2017, 15:35, at 15:35, Xu Mingmin  wrote:
> > >Hello,
> > >
> > >Is there anybody working on
> > >https://issues.apache.org/jira/browse/BEAM-307?
> > >The existing KafkaIO is implemented with Kafka 0.9, and not compatible
> > >well
> > >with Kafka 0.10.
> > >
> > >I'd like to take this task if not duplicated:
> > >1). a new KafkaIO based on Kafka 0.10, suggest a separated project for
> > >easy-to-build;
> > >2). use timestamp of Kafka message as default event-timestamp
> > >
> > >Thanks!
> > >Mingmin
> >
>


Re: BEAM-307(KafkaIO on Kafka 0.10)

2017-02-06 Thread Raghu Angadi
I see. kafka-clients dependency could also be in 'provided' scope so that
is simpler to use different versions at runtime.

On Mon, Feb 6, 2017 at 12:05 PM, Xu Mingmin  wrote:

> The one I meet is external authentication added in 0.10, we use a
> standalone token-based security service. In 0.9 the SASL-based
> implementation is fixed with Kerberos.
> Kafka client 0.10 cannot connect to Kafka server 0.9, that's why I mention
> a separated project.
>
> Mingmin
>
> On Mon, Feb 6, 2017 at 11:45 AM, Raghu Angadi 
> wrote:
>
> > Current KafkaIO works just fine with Kafka 0.10. I don't know of any
> > incompatibilities or regressions.
> >
> > It does not take advantage  of message timestamps, of course. It would be
> > good to take handle tme in in a backward compatible way.. it might be
> > required anyway if they are optional in 0.10.
> >
> > Not sure of scope of (1) below. I don't think it needs to be a new
> > implementation.
> >
> > On Mon, Feb 6, 2017 at 11:35 AM, Xu Mingmin  wrote:
> >
> > > Hello,
> > >
> > > Is there anybody working on https://issues.apache.org/
> > jira/browse/BEAM-307
> > > ?
> > > The existing KafkaIO is implemented with Kafka 0.9, and not compatible
> > well
> > > with Kafka 0.10.
> > >
> > > I'd like to take this task if not duplicated:
> > > 1). a new KafkaIO based on Kafka 0.10, suggest a separated project for
> > > easy-to-build;
> > > 2). use timestamp of Kafka message as default event-timestamp
> > >
> > > Thanks!
> > > Mingmin
> > >
> >
>


Re: BEAM-307(KafkaIO on Kafka 0.10)

2017-02-06 Thread Raghu Angadi
Current KafkaIO works just fine with Kafka 0.10. I don't know of any
incompatibilities or regressions.

It does not take advantage  of message timestamps, of course. It would be
good to take handle tme in in a backward compatible way.. it might be
required anyway if they are optional in 0.10.

Not sure of scope of (1) below. I don't think it needs to be a new
implementation.

On Mon, Feb 6, 2017 at 11:35 AM, Xu Mingmin  wrote:

> Hello,
>
> Is there anybody working on https://issues.apache.org/jira/browse/BEAM-307
> ?
> The existing KafkaIO is implemented with Kafka 0.9, and not compatible well
> with Kafka 0.10.
>
> I'd like to take this task if not duplicated:
> 1). a new KafkaIO based on Kafka 0.10, suggest a separated project for
> easy-to-build;
> 2). use timestamp of Kafka message as default event-timestamp
>
> Thanks!
> Mingmin
>


  1   2   >