Re: Multiple Outputs from Expand in Python

2019-10-25 Thread Robert Bradshaw
You can literally return a Python tuple of outputs from a composite
transform as well. (Dicts with PCollections as values are also
supported, if you want things to be named rather than referenced by
index.)

On Fri, Oct 25, 2019 at 4:06 PM Ahmet Altay  wrote:
>
> Is DoOutputsTuple what you are looking for? [1] You can look at this expand 
> function using it [2].
>
> [1] 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/pvalue.py#L204
> [2] 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py#L1283
>
> On Fri, Oct 25, 2019 at 3:51 PM Luke Cwik  wrote:
>>
>> My example is about multiple inputs and not multiple outputs from further 
>> investigation it seems as I don't know.
>>
>> Looking at the documentation online[1] doesn't seem to specify how to do 
>> this either for composite transforms. All the examples are of the single 
>> output variety as well[2].
>>
>> 1: 
>> https://beam.apache.org/documentation/programming-guide/#composite-transforms
>> 2: 
>> https://github.com/apache/beam/blob/4ba731fe93f7f8385c771caf576745d14edf34b8/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
>>
>> On Fri, Oct 25, 2019 at 10:24 AM Luke Cwik  wrote:
>>>
>>> I believe PCollectionTuple should be unnecessary since Python has first 
>>> class support for tuples as shown in the example below[1]. Can we use 
>>> tuples to solve your issue?
>>>
>>> wordsStartingWithA = \
>>> p | 'Words starting with A' >> beam.Create(['apple', 'ant', 'arrow'])
>>>
>>> wordsStartingWithB = \
>>> p | 'Words starting with B' >> beam.Create(['ball', 'book', 'bow'])
>>>
>>> ((wordsStartingWithA, wordsStartingWithB)
>>> | beam.Flatten()
>>> | LogElements())
>>>
>>> 1: 
>>> https://github.com/apache/beam/blob/238659bce8043e6a64619a959ab44453dbe22dff/learning/katas/python/Core%20Transforms/Flatten/Flatten/task.py#L29
>>>
>>> On Fri, Oct 25, 2019 at 10:11 AM Sam Rohde  wrote:

 Talked to Daniel offline and it looks like the Python SDK is missing 
 PCollection Tuples like the one Java has: 
 https://github.com/rohdesamuel/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java.

 I'll go ahead and implement that for the Python SDK.

 On Thu, Oct 24, 2019 at 5:20 PM Sam Rohde  wrote:
>
> Hey All,
>
> I'm trying to implement an expand override with multiple output 
> PCollections. The kicker is that I want to insert a new transform for 
> each output PCollection. How can I do this?
>
> Regards,
> Sam


Re: JIRA priorities explaination

2019-10-25 Thread Robert Bradshaw
I'm fine with that, but in that case we should have a priority for
release blockers, below which bugs get automatically bumped to the
next release (and which becomes the burndown list).

On Fri, Oct 25, 2019 at 1:58 PM Kenneth Knowles  wrote:
>
> My takeaway from this thread is that priorities should have a shared 
> community intuition and/or policy around how they are treated, which could 
> eventually be formalized into SLOs.
>
> At a practical level, I do think that build breaks are higher priority than 
> release blockers. If you are on this thread but not looking at the PR, here 
> is the verbiage I added about urgency:
>
> P0/Blocker: "A P0 issue is more urgent than simply blocking the next release"
> P1/Critical: "Most critical bugs should block release"
> P2/Major: "No special urgency is associated"
> ...
>
> Kenn
>
> On Fri, Oct 25, 2019 at 11:46 AM Robert Bradshaw  wrote:
>>
>> We cut a release every 6 weeks, according to schedule, making it easy
>> to plan for, and the release manager typically sends out a warning
>> email to remind everyone. I don't think it makes sense to do that for
>> every ticket. Blockers should be reserved for things we really
>> shouldn't release without.
>>
>> On Fri, Oct 25, 2019 at 11:33 AM Pablo Estrada  wrote:
>> >
>> > I mentioned on the PR that I had been using the 'blocker' priority along 
>> > with the 'fix version' field to mark issues that I want to get in the 
>> > release.
>> > Of course, this little practice of mine only matters much around release 
>> > branch cutting time - and has been useful for me to track which things I 
>> > want to ensure getting into the release / bump to the next /etc.
>> > I've also found it to be useful as a way to communicate with the release 
>> > manager without having to sync directly.
>> >
>> > What would be a reasonable way to tell the release manager "I'd like to 
>> > get this feature in. please talk to me if you're about to cut the branch" 
>> > - that also uses the priorities appropriately? - and that allows the 
>> > release manager to know when a fix version is "more optional" / "less 
>> > optional"?
>> >
>> > On Wed, Oct 23, 2019 at 12:20 PM Kenneth Knowles  wrote:
>> >>
>> >> I finally got around to writing some of this up. It is minimal. Feedback 
>> >> is welcome, especially if what I have written does not accurately 
>> >> represent the community's approach.
>> >>
>> >> https://github.com/apache/beam/pull/9862
>> >>
>> >> Kenn
>> >>
>> >> On Mon, Feb 11, 2019 at 3:21 PM Daniel Oliveira  
>> >> wrote:
>> >>>
>> >>> Ah, sorry, I missed that Alex was just quoting from our Jira 
>> >>> installation (didn't read his email closely enough). Also I wasn't aware 
>> >>> about those pages on our website.
>> >>>
>> >>> Seeing as we do have definitions for our priorities, I guess my main 
>> >>> request would be that they be made more discoverable somehow. I don't 
>> >>> think the tooltips are reliable, and the pages on the website are 
>> >>> informative, but hard to find. Since it feels a bit lazy to say "this 
>> >>> isn't discoverable enough" without suggesting any improvements, I'd like 
>> >>> to propose these two changes:
>> >>>
>> >>> 1. We should write a Beam Jira Guide with basic information about our 
>> >>> Jira. I think the bug priorities should go in here, but also anything 
>> >>> else we would want someone to know before filing any Jira issues, like 
>> >>> how our components are organized or what the different issue types mean. 
>> >>> This guide could either be written in the website or the wiki, but I 
>> >>> think it should definitely be linked in 
>> >>> https://beam.apache.org/contribute/ so that newcomers read it before 
>> >>> getting their Jira account approved. The goal here being to have a 
>> >>> reference for the basics of our Jira since at the moment it doesn't seem 
>> >>> like we have anything for this.
>> >>>
>> >>> 2. The existing info on Post-commit and pre-commit policies doesn't seem 
>> >>> very discoverable to someone monitoring the Pre/Post-commits. I've 
>> >>> reported a handful of test-failures already and haven't seen this link 
>> >

Re: [DISCUSS] How to stopp SdkWorker in SdkHarness

2019-10-25 Thread Robert Bradshaw
I think we'll still need approach (2) for when the pipeline finishes
and a runner is tearing down workers.

On Fri, Oct 25, 2019 at 10:36 AM Maximilian Michels  wrote:
>
> Hi Jincheng,
>
> Thanks for bringing this up and capturing the ideas in the doc.
>
> Intuitively, I would have also considered adding a new Proto message for
> the teardown, but I think the idea to trigger this logic when the SDK
> Harness evicts process bundle descriptors is more elegant.
>
> Thanks,
> Max
>
> On 25.10.19 17:23, Luke Cwik wrote:
> > I like approach 3 since it doesn't add additional complexity to the API
> > and individual SDKs can choose to implement any clean-up strategy they
> > want or none at all which is the simplest.
> >
> > On Thu, Oct 24, 2019 at 8:46 PM jincheng sun  > > wrote:
> >
> > Hi,
> >
> > Thanks for your comments in doc, I have add Approach 3 which you
> > mentioned! @Luke
> >
> > For now, we should do a decision for Approach 3 and Approach 1.
> > Detail can be found in doc [1]
> >
> > Welcome anyone's feedback :)
> >
> > Regards,
> > Jincheng
> >
> > [1]
> > 
> > https://docs.google.com/document/d/1sCgy9VQPf9zVXKRquK8P6N4x7aB62GEO8ozkujRSHZg/edit?usp=sharing
> >
> > jincheng sun  > > 于2019年10月25日周五 上午10:40写道:
> >
> > Hi,
> >
> > Functionally capable of `abort`, but it will be called at the
> > end of operator. So, I prefer `dispose` semantics. i.e., all
> > normal logic has been executed.
> >
> > Best,
> > Jincheng
> >
> > Harsh Vardhan mailto:anan...@google.com>>
> > 于2019年10月23日周三 上午12:14写道:
> >
> > Would approach 1 be akin to abort semantics?
> >
> > On Mon, Oct 21, 2019 at 8:01 PM jincheng sun
> > mailto:sunjincheng...@gmail.com>>
> > wrote:
> >
> > Hi Luke,
> >
> > Thanks a lot for your reply. Since it allows to share
> > one SDK harness between multiple executable stages, the
> > control service termination may occur much later than
> > the completion of an executable stage. This is the main
> > reason I prefer runners to control the teardown of DoFns.
> >
> > Regarding to "SDK harnesses can terminate instances any
> > time they want and start new instances anytime as
> > well.", personally I think it's not conflict with the
> > proposed Approach 1 as the SDK harness could decide what
> > to do when receiving the teardown request. It could do
> > nothing if the DoFns has already been teared down and
> > could also tear down the DoFns if needed.
> >
> > What do you think?
> >
> > Best,
> > Jincheng
> >
> > Luke Cwik mailto:lc...@google.com>>
> > 于2019年10月22日周二 上午2:05写道:
> >
> > Approach 2 is currently the suggested approach[1]
> > for DoFn's to shutdown.
> > Note that SDK harnesses can terminate instances any
> > time they want and start new instances anytime as well.
> >
> > Why do you want to expose this logic so that Runners
> > could control it?
> >
> > 1:
> > 
> > https://docs.google.com/document/d/1n6s3BOxOPct3uF4UgbbI9O9rpdiKWFH9R6mtVmR7xp0/edit#
> >
> > On Mon, Oct 21, 2019 at 4:27 AM jincheng sun
> >  > > wrote:
> >
> > Hi,
> > I found that in `SdkHarness` do not  stop the
> > `SdkWorker` when finish.  We should add the
> > logic for stop the `SdkWorker` in `SdkHarness`.
> > More detail can be found [1].
> >
> > There are two approaches to solve this issue:
> >
> > Approach 1:  We can add a Fn API for teardown
> > purpose and the runner will teardown a specific
> > bundle descriptor via this teardown Fn API
> > during disposing.
> > Approach 2: The control service termination
> > could be seen as a signal and once SDK harness
> > receives this signal, the teardown of the bundle
> > descriptor will be performed.
> >
> > More detail can be found in [2].
> >
> > As the Approach 2, SDK harness could be shared
> > between multiple executable stages. The control
> > service termination only occurs when all the
> >

Re: Python Precommit duration pushing 2 hours

2019-10-25 Thread Robert Bradshaw
It looks like fn_api_runner_test.py is quite expensive, taking 10-15+
minutes on each version of Python. This test consists of a base class
that is basically a validates runner suite, and is then run in several
configurations, many more of which (including some expensive ones)
have been added lately.

class FnApiRunnerTest(unittest.TestCase):
class FnApiRunnerTestWithGrpc(FnApiRunnerTest):
class FnApiRunnerTestWithGrpcMultiThreaded(FnApiRunnerTest):
class FnApiRunnerTestWithDisabledCaching(FnApiRunnerTest):
class FnApiRunnerTestWithMultiWorkers(FnApiRunnerTest):
class FnApiRunnerTestWithGrpcAndMultiWorkers(FnApiRunnerTest):
class FnApiRunnerTestWithBundleRepeat(FnApiRunnerTest):
class FnApiRunnerTestWithBundleRepeatAndMultiWorkers(FnApiRunnerTest):

I'm not convinced we need to run all of these permutations, or at
least not all tests in all permutations.

On Fri, Oct 25, 2019 at 10:57 AM Valentyn Tymofieiev
 wrote:
>
> I took another look at this and precommit ITs are already running in 
> parallel, albeit in the same suite. However it appears Python precommits 
> became slower, especially Python 2 precommits [35 min per suite x 3 suites], 
> see [1]. Not sure yet what caused the increase, but precommits used to be 
> faster. Perhaps we have added a slow test or a lot of new tests.
>
> [1] https://scans.gradle.com/s/jvcw5fpqfc64k/timeline?task=ancsbov425524
>
> On Thu, Oct 24, 2019 at 4:53 PM Ahmet Altay  wrote:
>>
>> Ack. Separating precommit ITs to a different suite sounds good. Anyone is 
>> interested in doing that?
>>
>> On Thu, Oct 24, 2019 at 2:41 PM Valentyn Tymofieiev  
>> wrote:
>>>
>>> This should not increase the queue time substantially, since precommit ITs 
>>> are running sequentially with precommit tests, unlike multiple precommit 
>>> tests which run in parallel to each other.
>>>
>>> The precommit ITs we run are batch and streaming wordcount tests on Py2 and 
>>> one Py3 version, so it's not a lot of tests.
>>>
>>> On Thu, Oct 24, 2019 at 1:07 PM Ahmet Altay  wrote:

 +1 to separating ITs from precommit. Downside would be, when Chad tried to 
 do something similar [1] it was noted that the total time to run all 
 precommit tests would increase and also potentially increase the queue 
 time.

 Another alternative, we could run a smaller set of IT tests in precommits 
 and run the whole suite as part of post commit tests.

 [1] https://github.com/apache/beam/pull/9642

 On Thu, Oct 24, 2019 at 12:15 PM Valentyn Tymofieiev  
 wrote:
>
> One improvement could be move to Precommit IT tests into a separate suite 
> from precommit tests, and run it in parallel.
>
> On Thu, Oct 24, 2019 at 11:41 AM Brian Hulette  
> wrote:
>>
>> Python Precommits are taking quite a while now [1]. Just visually it 
>> looks like the average length is 1.5h or so, but it spikes up to 2h. 
>> I've had several precommit runs get aborted due to the 2 hour limit.
>>
>> It looks like there was a spike up above 1h back on 9/6 and the duration 
>> has been steadily rising since then. Is there anything we can do about 
>> this?
>>
>> Brian
>>
>> [1] 
>> http://104.154.241.245/d/_TNndF2iz/pre-commit-test-latency?orgId=1=now-90d=now=4


Re: JIRA priorities explaination

2019-10-25 Thread Robert Bradshaw
We cut a release every 6 weeks, according to schedule, making it easy
to plan for, and the release manager typically sends out a warning
email to remind everyone. I don't think it makes sense to do that for
every ticket. Blockers should be reserved for things we really
shouldn't release without.

On Fri, Oct 25, 2019 at 11:33 AM Pablo Estrada  wrote:
>
> I mentioned on the PR that I had been using the 'blocker' priority along with 
> the 'fix version' field to mark issues that I want to get in the release.
> Of course, this little practice of mine only matters much around release 
> branch cutting time - and has been useful for me to track which things I want 
> to ensure getting into the release / bump to the next /etc.
> I've also found it to be useful as a way to communicate with the release 
> manager without having to sync directly.
>
> What would be a reasonable way to tell the release manager "I'd like to get 
> this feature in. please talk to me if you're about to cut the branch" - that 
> also uses the priorities appropriately? - and that allows the release manager 
> to know when a fix version is "more optional" / "less optional"?
>
> On Wed, Oct 23, 2019 at 12:20 PM Kenneth Knowles  wrote:
>>
>> I finally got around to writing some of this up. It is minimal. Feedback is 
>> welcome, especially if what I have written does not accurately represent the 
>> community's approach.
>>
>> https://github.com/apache/beam/pull/9862
>>
>> Kenn
>>
>> On Mon, Feb 11, 2019 at 3:21 PM Daniel Oliveira  
>> wrote:
>>>
>>> Ah, sorry, I missed that Alex was just quoting from our Jira installation 
>>> (didn't read his email closely enough). Also I wasn't aware about those 
>>> pages on our website.
>>>
>>> Seeing as we do have definitions for our priorities, I guess my main 
>>> request would be that they be made more discoverable somehow. I don't think 
>>> the tooltips are reliable, and the pages on the website are informative, 
>>> but hard to find. Since it feels a bit lazy to say "this isn't discoverable 
>>> enough" without suggesting any improvements, I'd like to propose these two 
>>> changes:
>>>
>>> 1. We should write a Beam Jira Guide with basic information about our Jira. 
>>> I think the bug priorities should go in here, but also anything else we 
>>> would want someone to know before filing any Jira issues, like how our 
>>> components are organized or what the different issue types mean. This guide 
>>> could either be written in the website or the wiki, but I think it should 
>>> definitely be linked in https://beam.apache.org/contribute/ so that 
>>> newcomers read it before getting their Jira account approved. The goal here 
>>> being to have a reference for the basics of our Jira since at the moment it 
>>> doesn't seem like we have anything for this.
>>>
>>> 2. The existing info on Post-commit and pre-commit policies doesn't seem 
>>> very discoverable to someone monitoring the Pre/Post-commits. I've reported 
>>> a handful of test-failures already and haven't seen this link mentioned 
>>> much. We should try to find a way to funnel people towards this link when 
>>> there's an issue, the same way we try to funnel people towards the 
>>> contribution guide when they write a PR. As a note, while writing this 
>>> email I remembered this link that someone gave me before 
>>> (https://s.apache.org/beam-test-failure). That mentions the Post-commit 
>>> policies page, so maybe it's just a matter of pasting that all over our 
>>> Jenkins builds whenever we have a failing test?
>>>
>>> PS: I'm also definitely for SLOs, but I figure it's probably better 
>>> discussed in a separate thread so I'm trying to stick to the subject of 
>>> priority definitions.
>>>
>>> On Mon, Feb 11, 2019 at 9:17 AM Scott Wegner  wrote:

 Thanks for driving this discussion. I also was not aware of these existing 
 definitions. Once we agree on the terms, let's add them to our Contributor 
 Guide and start using them.

 +1 in general; I like both Alex and Kenn's definitions; Additional 
 wordsmithing could be moved to a Pull Request. Can we make the definitions 
 useful for both the person filing a bug, and the assignee, i.e.

 : . 
 

 On Sun, Feb 10, 2019 at 7:49 PM Kenneth Knowles  wrote:
>
> The content that Alex posted* is the definition from our Jira 
> installation anyhow.
>
> I just searched around, and there's 
> https://community.atlassian.com/t5/Jira-questions/According-to-Jira-What-is-Blocker-Critical-Major-Minor-and/qaq-p/668774
>  which makes clear that this is really user-defined, since Jira has many 
> deployments with their own configs.
>
> I guess what I want to know about this thread is what action is being 
> proposed?
>
> Previously, there was a thread that resulted in 
> https://beam.apache.org/contribute/precommit-policies/ and 
> https://beam.apache.org/contribute/postcommits-policies/. 

Re: Interactive Beam Example Failing [BEAM-8451]

2019-10-21 Thread Robert Bradshaw
Thanks for trying this out. Yes, this is definitely something that
should be supported (and tested).

On Mon, Oct 21, 2019 at 3:40 PM Igor Durovic  wrote:
>
> Hi everyone,
>
> The interactive beam example using the DirectRunner fails after execution of 
> the last cell. The recursion limit is exceeded during the calculation of the 
> cache label because of a circular reference in the PipelineInfo object.
>
> The constructor for the PipelineInfo class creates a mapping from each 
> pcollection to the transforms that produce and consume it. The issue arises 
> when there exists a transform that is both a producer and a consumer for the 
> same pcollection. This occurs when a transform's expand method returns the 
> same pcoll object that's passed into it. The specific transform causing the 
> failure of the example is MaybeReshuffle, which is used in the Create 
> transform. Replacing "return pcoll" with "return pcoll | Map(lambda x: x)" 
> seems to fix the problem.
>
> A workaround for this issue on the interactive beam side would be fairly 
> simple, but it seems to me that there should be more validation of pipelines 
> to prevent the use of transforms that return the same pcoll that's passed in, 
> or at least a mention of this in the transform style guide. My understanding 
> is that pcollections are produced by a single transform (they even have a 
> field called "producer" that references only one transform). If that's the 
> case then that property of pcollections should be enforced.
>
> I made ticket BEAM-8451 to track this issue.
>
> I'm still new to beam so I apologize if I'm fundamentally misunderstanding 
> something. I'm not exactly sure what the next step should be and would 
> appreciate some recommendations. I can submit a PR to solve the immediate 
> problem of the failing example but the underlying problem should also be 
> addressed at some point. I also apologize if people are already aware of this 
> problem.
>
> Thank You!
> Igor Durovic


Re: Test failures in python precommit: ZipFileArtifactServiceTest

2019-10-21 Thread Robert Bradshaw
I just merged https://github.com/apache/beam/pull/9845 which should
resolve the issue.

On Mon, Oct 21, 2019 at 12:58 PM Chad Dombrova  wrote:
>
> thanks!
>
> On Mon, Oct 21, 2019 at 12:47 PM Kyle Weaver  wrote:
>>
>> This issue is being tracked at 
>> https://issues.apache.org/jira/browse/BEAM-8416.
>>
>> On Mon, Oct 21, 2019 at 9:42 PM Chad Dombrova  wrote:
>>>
>>> Hi all,
>>> Is anyone else getting these errors in 
>>> apache_beam.runners.portability.artifact_service_test.ZipFileArtifactServiceTest?
>>>
>>> They seem to be taking two forms:
>>>
>>> zipfile.BadZipFile: Bad CRC-32 for file 
>>> '/3e3ff9aa4fe679c1bf76383e69bfb5e2167afb945aa30e15f05406cc8f55ad14/9367417d63903350aeb7e092bca792263d4fd82d4912252e014e073a8931b4c1'
>>>
>>> zipfile.BadZipFile: Bad magic number for file header
>>>
>>> Here are some gradle scans:
>>>
>>> https://scans.gradle.com/s/b7jd7oyu5f5f6/console-log?task=:sdks:python:test-suites:tox:py37:testPy37Cython#L14473
>>>
>>> https://scans.gradle.com/s/4iega3kyf5kw2/console-log?task=:sdks:python:test-suites:tox:py37:testPython37#L13749
>>>
>>> I got it to go through eventually after 4 tries.
>>>
>>> -chad


Re: Are empty bundles allowed by model?

2019-10-21 Thread Robert Bradshaw
Yes, the test should be fixed.

On Mon, Oct 21, 2019 at 11:20 AM Jan Lukavský  wrote:
>
> Hi Robert,
>
> I though it would be that case. ParDoLifecycleTest, however, does not
> currently allow for empty bundles. We have currently worked around this
> in Flink by avoiding the creation of these bundles, but maybe the test
> should be modified so that it adheres to the model [1].
>
> Jan
>
> [1] https://github.com/apache/beam/pull/9846
>
> On 10/21/19 6:00 PM, Robert Bradshaw wrote:
> > Yes, the model allows them.
> >
> > It also takes less work to avoid them in general (e.g. imagine one
> > reshuffles N elements to M > N workers. A priori, one would "start" a
> > bundle and then try to read all data destined for that
> > worker--postponing this until one knows that the set of data for this
> > worker could be an optimization (as could not doing so as a form of
> > speculative execution) but should not be necessary.
> >
> > - Robert
> >
> > On Mon, Oct 21, 2019 at 7:03 AM Jan Lukavský  wrote:
> >> Hi Max,
> >>
> >> that is true, but then we have two orthogonal issues:
> >>
> >>a) correctness - if empty bundles are aligned with the model, then
> >> validates runner tests should take that into account
> >>
> >>b) performance - that can be dealt with in separate JIRA issue, if 
> >> needed
> >>
> >> WDYT?
> >>
> >> Jan
> >>
> >> On 10/21/19 3:22 PM, Maximilian Michels wrote:
> >>> Hi Jan,
> >>>
> >>> I think it is aligned with the model to create empty bundles. The
> >>> question if course, whether it is preferable to avoid them, since the
> >>> Setup/Finish state might be costly, depending on the bundle size and
> >>> the type of DoFn used.
> >>>
> >>> Cheers,
> >>> Max
> >>>
> >>> On 21.10.19 14:13, Kyle Weaver wrote:
> >>>> Nevermind, this is discussed on the PR linked.
> >>>>
> >>>> On Mon, Oct 21, 2019 at 2:11 PM Kyle Weaver  >>>> <mailto:kcwea...@google.com>> wrote:
> >>>>
> >>>>  Do you know why an empty bundle might be created?
> >>>>
> >>>>  On Mon, Oct 21, 2019 at 1:42 PM Jan Lukavský  >>>>  <mailto:je...@seznam.cz>> wrote:
> >>>>
> >>>>  Hi,
> >>>>
> >>>>  when debugging a flaky ParDoLifecycleTest in FlinkRunner, I have
> >>>>  found a
> >>>>  situation, where Flink might create empty bundle - i.e. call
> >>>>  @StartBundle immediately followed by @FinishBundle, with no
> >>>>  elements
> >>>>  inside the bundle. That is what breaks the ParDoLifecycleTest,
> >>>>  because
> >>>>  the test explicitly assumes, that the sequence of lifecycle
> >>>> methods
> >>>>  should be StartBundle -> Process Element -> Finish Bundle. It is
> >>>>  easy to
> >>>>  modify the test to accept situation of StartBundle ->
> >>>>  FinishBundle with
> >>>>  no elements ([1]), but the question is, is this allowed by the
> >>>>  model? I
> >>>>  think there is no reason not to be, but I'd like to be sure.
> >>>>
> >>>>  Thanks,
> >>>>
> >>>> Jan
> >>>>
> >>>>  [1] https://github.com/apache/beam/pull/9841
> >>>>


Re: Are empty bundles allowed by model?

2019-10-21 Thread Robert Bradshaw
Yes, the model allows them.

It also takes less work to avoid them in general (e.g. imagine one
reshuffles N elements to M > N workers. A priori, one would "start" a
bundle and then try to read all data destined for that
worker--postponing this until one knows that the set of data for this
worker could be an optimization (as could not doing so as a form of
speculative execution) but should not be necessary.

- Robert

On Mon, Oct 21, 2019 at 7:03 AM Jan Lukavský  wrote:
>
> Hi Max,
>
> that is true, but then we have two orthogonal issues:
>
>   a) correctness - if empty bundles are aligned with the model, then
> validates runner tests should take that into account
>
>   b) performance - that can be dealt with in separate JIRA issue, if needed
>
> WDYT?
>
> Jan
>
> On 10/21/19 3:22 PM, Maximilian Michels wrote:
> > Hi Jan,
> >
> > I think it is aligned with the model to create empty bundles. The
> > question if course, whether it is preferable to avoid them, since the
> > Setup/Finish state might be costly, depending on the bundle size and
> > the type of DoFn used.
> >
> > Cheers,
> > Max
> >
> > On 21.10.19 14:13, Kyle Weaver wrote:
> >> Nevermind, this is discussed on the PR linked.
> >>
> >> On Mon, Oct 21, 2019 at 2:11 PM Kyle Weaver  >> > wrote:
> >>
> >> Do you know why an empty bundle might be created?
> >>
> >> On Mon, Oct 21, 2019 at 1:42 PM Jan Lukavský  >> > wrote:
> >>
> >> Hi,
> >>
> >> when debugging a flaky ParDoLifecycleTest in FlinkRunner, I have
> >> found a
> >> situation, where Flink might create empty bundle - i.e. call
> >> @StartBundle immediately followed by @FinishBundle, with no
> >> elements
> >> inside the bundle. That is what breaks the ParDoLifecycleTest,
> >> because
> >> the test explicitly assumes, that the sequence of lifecycle
> >> methods
> >> should be StartBundle -> Process Element -> Finish Bundle. It is
> >> easy to
> >> modify the test to accept situation of StartBundle ->
> >> FinishBundle with
> >> no elements ([1]), but the question is, is this allowed by the
> >> model? I
> >> think there is no reason not to be, but I'd like to be sure.
> >>
> >> Thanks,
> >>
> >>Jan
> >>
> >> [1] https://github.com/apache/beam/pull/9841
> >>


Re: Python SDK timestamp precision

2019-10-18 Thread Robert Bradshaw
TL;DR: We should just settle on nanosecond precision ubiquitously for
timestamp/windowing in Beam.


Re-visiting this discussion in light of cross-language transforms and
runners, and trying to tighten up testing. I've spent some more time
thinking about how we could make these operations granularity-agnostic, but
just can't find a good solution. In particular, the sticklers seem to be:

(1) Windows are half-open intervals, and the timestamp associated with a
window coming out of a GBK is (by default) as large as possible but must
live in that window. (Otherwise WindowInto + GBK + WindowInto would have
the unforunate effect of moving aggregate values into subsequent windows,
which is clearly not the intent.) In other words, the timestamp of a
grouped value is basically End(Window) - epsilon. Unless we choose a
representation able to encode "minus epsilon" we must agree on a
granularity.

(2) Unless we want to have multiple vairants of all our WindowFns (e.g.
FixedWindowMillis, FixedWindowMicros, FixedWindowNanos) we must agree on a
granularity with which to parameterize these well-known operations. There
are cases (e.g. side input window mapping, merging) where these Fns may be
used downstream in contexts other than where they are applied/defined.

(3) Reification of the timestamp into user-visible data, and the other way
around, require a choice of precision to expose to the user. This means
that the timestamp is actual data, and truncating/rounding cannot be done
implicitly. Also round trip of reification and application of timestamps
should hopefully be idempotent no matter the SDK.

The closest I've come is possibly parameterizing the timestamp type, where
encoding, decoding (including pulling the end out of a window?), comparison
(against each other and a watermark), "minus epsilon", etc could be UDFs.
Possibly we'd need the full set of arithmetic operations to implement
FixedWindows on an unknown timestamp type. Reification would simply be
dis-allowed (or return an opaque rather than SDK-native) type if the SDK
did not know that window type. The fact that one might need comparison
between timestamps of different types, or (lossless) coercion from one type
to another, means that timestamp types need to know about each other, or
another entity needs to know about the full cross-product, unless there is
a common base-type (at which point we might as well always choose that).

An intermediate solution is to settle on floating (decimal) point
representation, plus a "minus-epsiloin" bit. It wouldn't quite solve the
mapping through SDK-native types (which could require rounding or errors or
a new opaque type, and few date librarys could faithfully expose the minus
epsilon part). It might also be more expensive (compute and storage), and
would not allow us to use the protofuf timestamp/duration fields (or any
standard date/time libraries).

Unless we can come up with a clean solution to the issues above shortly, I
think we should fix a precision and move forward. If this makes sense to
everyone, then we can start talking about the specific choice of precision
and a migration path (possibly only for portability).


For reference, the manipulations we do on timestamps are:

WindowInto: Timestamp -> Window
TimestampCombine: Window, [Timestamp] -> Timestamp
End(Window)
Min(Timestamps)
Max(Timestamps)
PastEndOfWindow: Watermark, Window -> {True, False}

[SideInput]WindowMappingFn: Window -> Window
WindowInto(End(Window))

GetTimestamp: Timestamp -> SDK Native Object
EmitAtTimestamp: SDK Native Object -> Timestamp






On Fri, May 10, 2019 at 1:33 PM Robert Bradshaw  wrote:

> On Thu, May 9, 2019 at 9:32 AM PM Kenneth Knowles  wrote:
>
> > From: Robert Bradshaw 
> > Date: Wed, May 8, 2019 at 3:00 PM
> > To: dev
> >
> >> From: Kenneth Knowles 
> >> Date: Wed, May 8, 2019 at 6:50 PM
> >> To: dev
> >>
> >> >> The end-of-window, for firing, can be approximate, but it seems it
> >> >> should be exact for timestamp assignment of the result (and similarly
> >> >> with the other timestamp combiners).
> >> >
> >> > I was thinking that the window itself should be stored as exact data,
> while just the firing itself is approximated, since it already is, because
> of watermarks and timers.
> >>
> >> I think this works where we can compare encoded windows, but some
> >> portable interpretation of windows is required for runner-side
> >> implementation of merging windows (for example).
> >
> > But in this case, you've recognized the URN of the WindowFn anyhow, so
> you understand its windows. Remembering that IntervalWindow is just one
> choice, and that windows themselves are totally user-defined and that
> merging logic is completely arbitrary per WindowFn (we 

Re: RFC: Assigning environments to transforms in a pipeline

2019-10-16 Thread Robert Bradshaw
Sounds nice. Is there a design doc (or, perhaps, you could just give an
example of what this would look like in this thread)?

On Wed, Oct 16, 2019 at 5:51 PM Chad Dombrova  wrote:

> Hi all,
> One of our goals for the portability framework is to be able to assign
> different environments to different segments of a pipeline.  This is not
> possible right now because environments are a concept that really only
> exist in the portable runner as protobuf messages:  they lack a proper API
> on the pipeline definition side of things.
>
> As a first step toward our goal, one of our team members just created a
> PR[1] exposing environments as a proper class hierarchy, akin to
> PTransforms, PCollections, Coders, etc. It's quite straightforward, and we
> were careful to adhere to existing patterns for similar types, so hopefully
> the end result feels natural.  After this PR is merged, our next step will
> be to create a proposal for assigning environments to transforms.
>
> Let us know what you think!
>
> -chad
>
> [1] https://github.com/apache/beam/pull/9811
>
>


Re: [design] A streaming Fn API runner for Python

2019-10-15 Thread Robert Bradshaw
Very excited to see this! I've added some comments to the doc.

On Tue, Oct 15, 2019 at 3:43 PM Pablo Estrada  wrote:

> I've just been informed that access wasn't open. I've since opened access
> to it.
> Thanks
> -P.
>
> On Tue, Oct 15, 2019 at 2:10 PM Pablo Estrada  wrote:
>
>> Hello all,
>> I am planning to work on removing the old BundleBasedDirectRunner, and
>> expand the FnApiRunner to work on streaming as well as batch.
>> Currently, the FnApiRunner orders the processing graph topologically, and
>> "pushes" all the data through each stage in topological order (deferred
>> inputs such as residuals and timers are immediately pushed to the SDK as
>> well).
>> The new design would change from this
>> push-all-data-through-topologically-sorted-stages model to having queues
>> for "bundles", or for elements that are awaiting processing, and routing
>> them to the appropriate bundle processing subgraph.
>>
>> The design is here: http://s.apache.org/streaming-fn-runner-py
>>
>> I expect
>>
>> I'd appreciate comments and everything : )
>> Best
>> -P.
>>
>


Re: So much green

2019-10-11 Thread Robert Bradshaw
Very nice to see, thanks for sharing.

On Fri, Oct 11, 2019 at 5:44 AM Maximilian Michels  wrote:
>
> Glad to see that we have fixed the recent flakes. Let's keep up the good
> work :)
>
> -Max
>
> On 10.10.19 23:37, Kenneth Knowles wrote:
> > All the cells in the pull request template are green right now and have
> > been for most of the afternoon & evening. I just thought I would share
> > this wonderful fact with everyone. Seeing it really makes an impression!
> >
> > Kenn


Re: Python thread pool executor for Apache Beam

2019-10-11 Thread Robert Bradshaw
Can we use a lower default timeout to mitigate this issue in the short
term (I'd imagine one second or possibly smaller would be sufficient
for our use), and get a fix upstream in the long term?

On Fri, Oct 11, 2019 at 9:38 AM Luke Cwik  wrote:
>
> I'm looking for a thread pool that re-uses threads that are idle before 
> creating new ones and has an API that is compatible with the 
> concurrent.futures ThreadPoolExecutor[1].
>
> To my knowledge, the concurrent.futures ThreadPool creates new threads for 
> tasks up until the thread pool limit before re-using existing ones for all 
> Python versions prior to 3.8.
>
> I tried using CollapsingThreadPoolExecutor within pr/9477[2] but after 
> testing it with Apache Beam, I found that it has some pool shutdown issues[3].
>
> Does anyone have any suggestions for a good Python library that contains a 
> stable thread pool implementation?
>
> Preferably the library that provides the thread pool would have no 
> dependencies and be compatible with the same Python versions that Apache Beam 
> is compatible with today.
>
> 1: 
> https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor
> 1: https://github.com/apache/beam/pull/9477
> 2: https://github.com/ftpsolutions/collapsing-thread-pool-executor/issues/3


Re: Beam Python fails to run on macOS 10.15?

2019-10-10 Thread Robert Bradshaw
Looks like an issue with the protobuf library. Do you know what
version of protobuf you're using? (E.g. by running pip freeze.)

I don't have Catalina to test this on, but it'd be useful if you could
winnow this down to the import that fails.

On Thu, Oct 10, 2019 at 8:15 AM Kamil Wasilewski
 wrote:
>
> Hi all,
>
> I've recently updated my macOS to 10.15 Catalina. Since then, I have the 
> following error when I try to import apache_beam package (both in python 2.7 
> and 3.x):
>
> >>> import apache_beam
> [libprotobuf ERROR google/protobuf/descriptor_database.cc:58] File already 
> exists in database:
> [libprotobuf FATAL google/protobuf/descriptor.cc:1370] CHECK failed: 
> GeneratedDatabase()->Add(encoded_file_descriptor, size):
> libc++abi.dylib: terminating with uncaught exception of type 
> google::protobuf::FatalException: CHECK failed: 
> GeneratedDatabase()->Add(encoded_file_descriptor, size):
>
> [1]43669 abort  python
>
> Does anyone has the same problem? I saw that someone else had the same error 
> at the beam-python slack channel, so I guess this problem is not limited to 
> my workstation.
>
> Thanks,
> Kamil


Re: [spark structured streaming runner] merge to master?

2019-10-10 Thread Robert Bradshaw
On Thu, Oct 10, 2019 at 12:39 AM Etienne Chauchot  wrote:
>
> Hi guys,
>
> You probably know that there has been for several months an work
> developing a new Spark runner based on Spark Structured Streaming
> framework. This work is located in a feature branch here:
> https://github.com/apache/beam/tree/spark-runner_structured-streaming
>
> To attract more contributors and get some user feedback, we think it is
> time to merge it to master. Before doing so, some steps need to be achieved:
>
> - finish the work on spark Encoders (that allow to call Beam coders)
> because, right now, the runner is in an unstable state (some transforms
> use the new way of doing ser/de and some use the old one, making a
> pipeline incoherent toward serialization)
>
> - clean history: The history contains commits from November 2018, so
> there is a good amount of work, thus a consequent number of commits.
> They were already squashed but not from September 2019

I don't think the number of commits should be an issue--we shouldn't
just squash years worth of history away. (OTOH, if this is a case of
this branch containing lots of little, irrelevant commits that would
have normally been squashed away in the normal review process we do
for the main branch, then, yes, some cleanup could be nice.)

> Regarding status:
>
> - the runner passes 89% of the validates runner tests in batch mode. We
> hope to pass more with the new Encoders
>
> - Streaming mode is barely started (waiting for the multi-aggregations
> support in spark SS framework from the Spark community)
>
> - Runner can execute Nexmark
>
> - Some things are not wired up yet
>
>  - Beam Schemas not wired with Spark Schemas
>
>  - Optional features of the model not implemented:  state api, timer
> api, splittable doFn api, …
>
> WDYT, can we merge it to master once the 2 steps are done ?

I think that as long as it sits parallel to the existing runner, and
is clearly marked with its status, it makes sense to me. How many
changes does it make to the existing codebase (as opposed to add new
code)?


Re: [portability] Removing the old portable metrics API...

2019-10-09 Thread Robert Bradshaw
To clarify on the Dataflow story, there are three Dataflow (worker)
implementations: the legacy non-portable one, the Portability Java Runner
Harness (currently used by Python Streaming), and the Portability Unified
Worker (intended to replace the Java Runner Harness). MonitoringInfos
doesn't apply to the legacy one, is implemented for the JRH, but the UW
still uses the old-style counters.

So, I think we can't delete the old-style yet, but hopefully soon (pending
Go + UW support--it's on the roadmaps but I don't have an ETA).

On Wed, Oct 9, 2019 at 11:40 AM Alex Amato  wrote:

> @Robert Bradshaw  Dataflow is updated to use
> MonitoringInfos.
>
> This is specifically referring to the FN API Layer. Beam Python and Beam
> Java export metrics using the new APIs. And the DataflowRunner harness is
> consuming and using those. When I was removed from that project, most of
> the metrics were implemented in the
> Python and Java SDKs as MonitoringInfos.
>
>
>
> Java SDK
>
> Python SDK
>
> Go SDK
>
> User Counters
>
> Done
>
> Done
>
> Legacy FN API
>
> User Distributions
>
> Done
>
> Done
>
> Legacy FN API
>
> Execution Time Start
>
> Done
>
> Done
>
> Not Started
>
> Execution Time Process()
>
> Done
>
> Done
>
> Not Started
>
> Execution Time Finish()
>
> Done
>
> Done
>
> Not Started
>
> Element Count
>
> Done
>
> Done
>
> Legacy FN API
>
> Sampled PColl Byte Size
>
> Pending (PR/8416 <https://github.com/apache/beam/pull/8416>)
>
> Handoff instructions
>
> BEAM-7462 <https://issues.apache.org/jira/browse/BEAM-7462?filter=-2>
>
> Done
>
> Legacy FN API
>
> And the Dataflow Java Runner Harness was consuming this. +Mikhail
> Gryzykhin  implemented the runner harness layer.
>
> Do delete the deprecated stuff, we would need to get the Go SDK on
> MonitoringInfos for what it has implemented so far.
>
> Integration test coverage could be increased. But we wrote this test
> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline.py>
> .
>
>
> On Wed, Oct 9, 2019 at 10:51 AM Luke Cwik  wrote:
>
>> One way would be to report both so this way we don't need to update the
>> Dataflow Java implementation but other runners using the new API get all
>> the metrics.
>>
>> On Mon, Oct 7, 2019 at 10:00 AM Robert Bradshaw 
>> wrote:
>>
>>> Yes, Dataflow still uses the old API, for both counters and for its
>>> progress/autoscaling mechanisms. We'd need to convert that over as
>>> well (which is on the TODO list but lower than finishing up support
>>> for portability in general).
>>>
>>> On Mon, Oct 7, 2019 at 9:56 AM Robert Burke  wrote:
>>> >
>>> > The Go SDK uses the old API [1], but it shouldn't be too hard to
>>> migrate it.
>>> >
>>> > The main thing I'd want to do at the same time is move the
>>> dependencies on the protos out of that package and have those live only in
>>> the harness package [2]. I wasn't aware of that particular separation of
>>> concerns until much later, but allows for alternative harness
>>> implementations.
>>> >
>>> > I have some other work to get the Per-DoFn profiling metrics (eleemnt
>>> count, size, time) into the Go SDK this quarter, so I can handle this then.
>>> >
>>> > [1]
>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/metrics/metrics.go#L474
>>> > [2]
>>> https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/core/runtime/harness
>>> >
>>> > On Fri, Oct 4, 2019 at 6:14 PM Pablo Estrada 
>>> wrote:
>>> >>
>>> >> Hello devs,
>>> >> I recently took a look at how Dataflow is retrieving metrics from the
>>> Beam SDK harnesses, and noticed something. As you may (or may not)
>>> remember, the portability API currently has two ways of reporting metrics.
>>> Namely, the newer MonitoringInfo API[1], and the older Metrics one[2].
>>> >>
>>> >> This is somewhat troublesome because now we have two things that do
>>> the same thing. The SDKs report double the amount of metrics[3][4], and I
>>> bet it's confusing for runner implementers.
>>> >>
>>> >> Luckily, it seems like the Flink and Spark runners do use the new API
>>> [5][6] - yay! : ) - so I guess then the only runner that uses the old API
>>> is Dataflow? (internally)
>>> >>
>>> 

Re: Please comment on draft comms strategy by Oct 16

2019-10-09 Thread Robert Bradshaw
Probably worth mentioning Slack and StackOverflow as well.

On Wed, Oct 9, 2019 at 3:59 PM María Cruz  wrote:
>
> Hi all,
> sorry for multiple messages. I realized after sending the first email that a 
> new thread with a different subject was probably more efficient.
>
> I created a communication strategy draft. To start, I did a map of Beam 
> channels and content, and I have some questions for you: 
> https://github.com/macruzbar/beam/blob/master/Communication-strategy-DRAFT.md
>
> In order to create these files, I forked the repo. Once this looks good, and 
> if everyone agrees, we can merge the changes to apache/beam.
>
> I didn't assign reviewers for this file because I don't know if there is 
> someone who usually looks at these kinds of documents. So everyone: please 
> feel free to pitch in! I will give this a week for comments.
>
> Looking forward to your comments!
>
> María


Re: [portability] Removing the old portable metrics API...

2019-10-07 Thread Robert Bradshaw
Yes, Dataflow still uses the old API, for both counters and for its
progress/autoscaling mechanisms. We'd need to convert that over as
well (which is on the TODO list but lower than finishing up support
for portability in general).

On Mon, Oct 7, 2019 at 9:56 AM Robert Burke  wrote:
>
> The Go SDK uses the old API [1], but it shouldn't be too hard to migrate it.
>
> The main thing I'd want to do at the same time is move the dependencies on 
> the protos out of that package and have those live only in the harness 
> package [2]. I wasn't aware of that particular separation of concerns until 
> much later, but allows for alternative harness implementations.
>
> I have some other work to get the Per-DoFn profiling metrics (eleemnt count, 
> size, time) into the Go SDK this quarter, so I can handle this then.
>
> [1] 
> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/metrics/metrics.go#L474
> [2] 
> https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/core/runtime/harness
>
> On Fri, Oct 4, 2019 at 6:14 PM Pablo Estrada  wrote:
>>
>> Hello devs,
>> I recently took a look at how Dataflow is retrieving metrics from the Beam 
>> SDK harnesses, and noticed something. As you may (or may not) remember, the 
>> portability API currently has two ways of reporting metrics. Namely, the 
>> newer MonitoringInfo API[1], and the older Metrics one[2].
>>
>> This is somewhat troublesome because now we have two things that do the same 
>> thing. The SDKs report double the amount of metrics[3][4], and I bet it's 
>> confusing for runner implementers.
>>
>> Luckily, it seems like the Flink and Spark runners do use the new API [5][6] 
>> - yay! : ) - so I guess then the only runner that uses the old API is 
>> Dataflow? (internally)
>>
>> Which way does the Samza runner use? +Hai Lu?
>> How about the Go SDK +Robert Burke ? - Ah I bet this uses the old API?
>>
>> If they all use the MonitoringInfos, we may be able to clean up the old api, 
>> and move to the new one (somewhat)soon : )
>>
>> [1] 
>> https://github.com/apache/beam/blob/v2.15.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L395
>> [2] 
>> https://github.com/apache/beam/blob/v2.15.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L391
>> [3] 
>> https://github.com/apache/beam/blob/c1007b678a00ea85671872236edef940a8e56adc/sdks/python/apache_beam/runners/worker/sdk_worker.py#L406-L414
>> [4] 
>> https://github.com/apache/beam/blob/c1007b678a00ea85671872236edef940a8e56adc/sdks/python/apache_beam/runners/worker/sdk_worker.py#L378-L384
>>
>> [5] 
>> https://github.com/apache/beam/blob/44fa33e6518574cb9561f47774e218e0910093fe/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java#L94-L97
>> [6] 
>> https://github.com/apache/beam/blob/932bd80a17171bd2d8157820ffe09e8389a52b9b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java#L219-L226


Re: [VOTE] Release 2.16.0, release candidate #1

2019-10-04 Thread Robert Bradshaw
OK, this appears to have been a weird config issue on my system
(though the error certainly could have been better). As BEAM-8303 has
a workaround and all else is looking good, I don't think that's worth
another RC.

+1 (binding) to this release.

On Fri, Oct 4, 2019 at 10:56 AM Robert Bradshaw  wrote:
>
> The artifact signatures and contents all look good to me. I've also
> verify the wheels work for the direct runner. However, I'm having an
> issue with trying to run on dataflow with Python 3.6:
>
> python -m apache_beam.examples.wordcount   --input
> gs://clouddfe-robertwb/chicago_taxi_data/eval/data.csv   --output
> gs://clouddfe-robertwb/test/xcounts.txt   --runner=Dataflow
> --project=google.com:clouddfe
> --temp_location=gs://clouddfe-robertwb/fn-api/tmp
> --staging_location=gs://clouddfe-robertwb/tmp
> --sdk_location=staging/apache-beam-2.16.0.zip
> ...
>   File 
> "/usr/local/google/home/robertwb/beam-release/release-verify/staging/test-venv/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py",
> line 374, in exists
> self.client.objects.Get(request)  # metadata
>   File 
> "/usr/local/google/home/robertwb/beam-release/release-verify/staging/test-venv/lib/python3.6/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py",
> line 1100, in Get
> download=download)
>   File 
> "/usr/local/google/home/robertwb/beam-release/release-verify/staging/test-venv/lib/python3.6/site-packages/apitools/base/py/base_api.py",
> line 729, in _RunMethod
> http, http_request, **opts)
>   File 
> "/usr/local/google/home/robertwb/beam-release/release-verify/staging/test-venv/lib/python3.6/site-packages/apitools/base/py/http_wrapper.py",
> line 360, in MakeRequest
> max_retry_wait, total_wait_sec))
>   File 
> "/usr/local/google/home/robertwb/beam-release/release-verify/staging/test-venv/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio_overrides.py",
> line 43, in retry_func
> return http_wrapper.HandleExceptionsAndRebuildHttpConnections(retry_args)
>   File 
> "/usr/local/google/home/robertwb/beam-release/release-verify/staging/test-venv/lib/python3.6/site-packages/apitools/base/py/http_wrapper.py",
> line 294, in HandleExceptionsAndRebuildHttpConnections
> retry_args.exc.status >= 500)):
>
> Is this just me or a wider issue?
>
> On Fri, Oct 4, 2019 at 10:27 AM Pablo Estrada  wrote:
> >
> > Hi all,
> > I looked at https://issues.apache.org/jira/browse/BEAM-8303, and it seems 
> > like the user has a workaround - is that correct?
> > If that's the case, then I vote +1.
> >
> > @Max - lmk if you'd like to discuss further, but for now my vote is on +1.
> > Best
> > -P.
> >
> > On Fri, Oct 4, 2019 at 9:29 AM Mark Liu  wrote:
> >>
> >> +1 (forgot to vote)
> >>
> >> I also triggered Java Nexmark on direct, dataflow, spark and flink runner. 
> >> Didn't saw performance regression from the dashboard 
> >> (https://apache-beam-testing.appspot.com/dashboard-admin)
> >>
> >> On Fri, Oct 4, 2019 at 8:23 AM Mark Liu  wrote:
> >>>
> >>> Thanks for the validation work! I validated following:
> >>>
> >>> - Java Quickstart on direct, dataflow,spark local, flink local runner
> >>> - Java mobile gaming on direct and dataflow runner
> >>> - Python Quickstart in batch and streaming in py2/3.5/3.6/3.7 using 
> >>> wheals/zip
> >>> - Python Mobile Game in batch/streaming in py2/3.5/3.6/3.7 using 
> >>> wheals/zip on direct and dataflow runner
> >>>
> >>> Mark
> >>>
> >>> On Thu, Oct 3, 2019 at 6:57 PM Ahmet Altay  wrote:
> >>>>
> >>>> I see most of the release validations have been completed and marked in 
> >>>> the spreadsheet. Thank you all for doing that. If you have not 
> >>>> validated/voted yet please take a look at the release candidate.
> >>>>
> >>>> On Thu, Oct 3, 2019 at 7:59 AM Thomas Weise  wrote:
> >>>>>
> >>>>> I think there is a different reason why the release manager should 
> >>>>> probably merge/approve all PRs that go into the release branch while 
> >>>>> the release is in progress:
> >>>>>
> >>>>> If/when the need arises for another RC, then only those changes should 
> >>>>> be included that are deemed blockers or explicitly agreed. Otherwise 
> >>>>> the release can potentially be delayed by modifications that invalidate 
> >>>>> prior verif

Re: Plan for dropping python 2 support

2019-10-04 Thread Robert Bradshaw
 explicit, one of those 
>>>> dependencies is Dataflow's python pre-portability workers.)
>>>>
>>>> On Thu, Sep 19, 2019 at 5:17 PM Maximilian Michels  wrote:
>>>>>
>>>>> Granted that we just have finalized the Python 3 support, we should
>>>>> allow time for it to mature and for users to make the switch.
>>>>>
>>>>> > Oh, and one more thing, I think it'd make sense for Apache Beam to
>>>>> > sign https://python3statement.org/. The promise is that we'd
>>>>> > discontinue Python 2 support *in* 2020, which is not committing us to
>>>>> > January if we're not ready. Worth a vote?
>>>>>
>>>>> +1
>>>>
>>>>
>>>> +1
>>>>
>>>>>
>>>>>
>>>>> On 19.09.19 15:59, Robert Bradshaw wrote:
>>>>> > Oh, and one more thing, I think it'd make sense for Apache Beam to
>>>>> > sign https://python3statement.org/. The promise is that we'd
>>>>> > discontinue Python 2 support *in* 2020, which is not committing us to
>>>>> > January if we're not ready. Worth a vote?
>>>>> >
>>>>> >
>>>>> > On Thu, Sep 19, 2019 at 3:58 PM Robert Bradshaw  
>>>>> > wrote:
>>>>> >>
>>>>> >> Exactly how long we support Python 2 depends on our users. Other than
>>>>> >> those that speak up (such as yourself, thanks!), it's hard to get a
>>>>> >> handle on how many need Python 2 and for how long. (Should we send out
>>>>> >> a survey? Maybe after some experience with 2.16?)
>>>>
>>>>
>>>> +1, we had some success with collecting information from users using 
>>>> Twitter surveys.
>>>>
>>>>>
>>>>> >>
>>>>> >> On the one hand, the whole ecosystem is finally moving on, and even if
>>>>> >> Beam continues to support Python 2 our dependencies, or other projects
>>>>> >> that are being used in conjunction with Beam, will also be going
>>>>> >> Python 3 only. On the other hand, Beam is, admittedly, quite late to
>>>>> >> the party and could be the one holding people back, and looking at how
>>>>> >> long it took us, if we just barely make it by the end of the year it's
>>>>> >> unreasonable to say at that point "oh, and we're dropping 2.7 at the
>>>>> >> same time."
>>>>> >>
>>>>> >> The good news is that 2.16 is shaping up to be a release I would
>>>>> >> recommend everyone migrate to Python 3 on. The remaining issues are
>>>>> >> things like some issues with main sessions (which already has issues
>>>>> >> in Python 2) and not supporting keyword-only arguments (a new feature,
>>>>> >> not a regression). I would guess that even 2.15 is already good enough
>>>>> >> for most people, at least to kick the tires and running tests to start
>>>>> >> the effort.
>>>>
>>>>
>>>> I share the same sentiment. Beam 2.16 will offer a strong python 3 
>>>> offering. Yes, there are known issues but this is not much different than 
>>>> the known issues for rest of the python offering.
>>>>
>>>>>
>>>>> >>
>>>>> >> (I also agree with the sentiment that once we go 3.x only, it'll be
>>>>> >> likely harder to maintain a 2.x LTS... but the whole LTS thing is
>>>>> >> being discussed in another thread.)
>>>>>
>>>>> >>
>>>>> >> On Thu, Sep 19, 2019 at 2:44 PM Chad Dombrova  
>>>>> >> wrote:
>>>>> >>>
>>>>> >>> Hi all,
>>>>> >>> I had a read through this thread in the archives. It occurred before 
>>>>> >>> I joined the mailing list, so I hope that this email connects up with 
>>>>> >>> the thread properly for everyone.
>>>>> >>>
>>>>> >>> I'd like to respond to the following points:
>>>>> >>>
>>>>> >>>> I believe we are referring to two separate things with support:
>>>>> >>>> - Supporting existing releases for patches - I agree that w

Re: [VOTE] Release 2.16.0, release candidate #1

2019-10-04 Thread Robert Bradshaw
The artifact signatures and contents all look good to me. I've also
verify the wheels work for the direct runner. However, I'm having an
issue with trying to run on dataflow with Python 3.6:

python -m apache_beam.examples.wordcount   --input
gs://clouddfe-robertwb/chicago_taxi_data/eval/data.csv   --output
gs://clouddfe-robertwb/test/xcounts.txt   --runner=Dataflow
--project=google.com:clouddfe
--temp_location=gs://clouddfe-robertwb/fn-api/tmp
--staging_location=gs://clouddfe-robertwb/tmp
--sdk_location=staging/apache-beam-2.16.0.zip
...
  File 
"/usr/local/google/home/robertwb/beam-release/release-verify/staging/test-venv/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py",
line 374, in exists
self.client.objects.Get(request)  # metadata
  File 
"/usr/local/google/home/robertwb/beam-release/release-verify/staging/test-venv/lib/python3.6/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py",
line 1100, in Get
download=download)
  File 
"/usr/local/google/home/robertwb/beam-release/release-verify/staging/test-venv/lib/python3.6/site-packages/apitools/base/py/base_api.py",
line 729, in _RunMethod
http, http_request, **opts)
  File 
"/usr/local/google/home/robertwb/beam-release/release-verify/staging/test-venv/lib/python3.6/site-packages/apitools/base/py/http_wrapper.py",
line 360, in MakeRequest
max_retry_wait, total_wait_sec))
  File 
"/usr/local/google/home/robertwb/beam-release/release-verify/staging/test-venv/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio_overrides.py",
line 43, in retry_func
return http_wrapper.HandleExceptionsAndRebuildHttpConnections(retry_args)
  File 
"/usr/local/google/home/robertwb/beam-release/release-verify/staging/test-venv/lib/python3.6/site-packages/apitools/base/py/http_wrapper.py",
line 294, in HandleExceptionsAndRebuildHttpConnections
retry_args.exc.status >= 500)):

Is this just me or a wider issue?

On Fri, Oct 4, 2019 at 10:27 AM Pablo Estrada  wrote:
>
> Hi all,
> I looked at https://issues.apache.org/jira/browse/BEAM-8303, and it seems 
> like the user has a workaround - is that correct?
> If that's the case, then I vote +1.
>
> @Max - lmk if you'd like to discuss further, but for now my vote is on +1.
> Best
> -P.
>
> On Fri, Oct 4, 2019 at 9:29 AM Mark Liu  wrote:
>>
>> +1 (forgot to vote)
>>
>> I also triggered Java Nexmark on direct, dataflow, spark and flink runner. 
>> Didn't saw performance regression from the dashboard 
>> (https://apache-beam-testing.appspot.com/dashboard-admin)
>>
>> On Fri, Oct 4, 2019 at 8:23 AM Mark Liu  wrote:
>>>
>>> Thanks for the validation work! I validated following:
>>>
>>> - Java Quickstart on direct, dataflow,spark local, flink local runner
>>> - Java mobile gaming on direct and dataflow runner
>>> - Python Quickstart in batch and streaming in py2/3.5/3.6/3.7 using 
>>> wheals/zip
>>> - Python Mobile Game in batch/streaming in py2/3.5/3.6/3.7 using wheals/zip 
>>> on direct and dataflow runner
>>>
>>> Mark
>>>
>>> On Thu, Oct 3, 2019 at 6:57 PM Ahmet Altay  wrote:

 I see most of the release validations have been completed and marked in 
 the spreadsheet. Thank you all for doing that. If you have not 
 validated/voted yet please take a look at the release candidate.

 On Thu, Oct 3, 2019 at 7:59 AM Thomas Weise  wrote:
>
> I think there is a different reason why the release manager should 
> probably merge/approve all PRs that go into the release branch while the 
> release is in progress:
>
> If/when the need arises for another RC, then only those changes should be 
> included that are deemed blockers or explicitly agreed. Otherwise the 
> release can potentially be delayed by modifications that invalidate prior 
> verification or introduce new instability.


 I agree with this reasoning. It expresses my concern in a more clear way.

>
>
> Thomas
>
>
> On Thu, Oct 3, 2019 at 3:12 AM Maximilian Michels  wrote:
>>
>>  > For the next time, may I suggest asking release manager to do the
>>  > merging to the release branch. We do not know whether there will be an
>>  > RC2 or not. And if there will not be an RC2 release branch as of now
>>  > does not directly correspond to what will be released.
>>
>> The ground truth for releases are the release tags, not the release
>> branches. Downstream projects should not depend on the release branches.
>> Release branches are merely important for the process of creating a
>> release, but they lose validity after the RC has been created and 
>> released.
>>
>> On 02.10.19 11:45, Ahmet Altay wrote:
>> > +1 (validated python quickstarts). Thank you Mark.
>> >
>> > On Wed, Oct 2, 2019 at 10:49 AM Maximilian Michels > > > wrote:
>> >
>> > Thanks for preparing the release, Mark! I would like to address
>> > 

Re: Dockerhub push denied for py3.6 and py3.7 image

2019-10-02 Thread Robert Bradshaw
Please add robertwb0 as well.

On Wed, Oct 2, 2019 at 9:09 AM Ahmet Altay  wrote:
>
>
>
> On Tue, Oct 1, 2019 at 8:44 PM Pablo Estrada  wrote:
>>
>> When she set up the repo, Hannah requested PMC members to ask for 
>> privileges, so I did.
>> The set of admins currently is just Hannah and myself - and I don't think 
>> this is available in a public page.
>>
>> We could either have a PMC-managed account, or allow more PMC members to 
>> have admin privileges - for redundancy.
>
>
> I remember we agreed to add any interested PMC members for redundancy.
>
> Could you add my username ('aaltaybeam') to the list please?
>
> Ahmet
>
>>
>> Best
>> -P.
>>
>> On Tue, Oct 1, 2019 at 6:44 PM Ahmet Altay  wrote:
>>>
>>> Who are the admins on dockerhub currently? Is there a page that shows a 
>>> list? The next person doing the release will probably run into similar 
>>> issues. For example, pypi page for beam [1] shows lists of maintainers.
>>>
>>> [1] https://pypi.org/project/apache-beam/
>>>
>>> Thank you,
>>> Ahmet
>>>
>>> On Tue, Oct 1, 2019 at 11:32 AM Mark Liu  wrote:

 I can push them now. Thank you Pablo!

 On Tue, Oct 1, 2019 at 11:05 AM Pablo Estrada  wrote:
>
> You were right that the push permissions for repository maintainers were 
> missing. I've just added the permissions, and you should be able to push 
> to them now.
> Thanks Mark!
>
> On Tue, Oct 1, 2019 at 11:02 AM Pablo Estrada  wrote:
>>
>> I'll check for you. One second.
>>
>> On Tue, Oct 1, 2019 at 10:32 AM Mark Liu  wrote:
>>>
>>> Hello Dockerhub Admins,
>>>
>>> I was able to push Java, Go, py2.7 and py3.5 images to 
>>> hub.docker.com/u/apachebeam for 2.16 release, but failed for py3.6 and 
>>> py3.7 due to "denied: requested access to the resource is denied". 
>>> Wondering if I missed some permissions. Can any Dockerhub admins help 
>>> me check it?
>>>
>>> Thanks,
>>> Mark, Release Manager


Re: Multiple iterations after GroupByKey with SparkRunner

2019-10-01 Thread Robert Bradshaw
For this specific usecase, I would suggest this be done via PTranform URNs.
E.g. one could have a GroupByKeyOneShot whose implementation is

input
.apply(GroupByKey.of()
.apply(kv -> KV.of(kv.key(), kv.iterator())

A runner would be free to recognize and optimize this in the graph (based
on its urn) and swap out a more efficient implementation. Of course a
Coder would have to be introduced, and the semantics of
PCollection are a bit odd due to the inherently mutable nature of
Iterators. (Possibly a ReducePerKey transform would be a better
abstraction.)


On Tue, Oct 1, 2019 at 2:16 AM Jan Lukavský  wrote:

> The car analogy was meant to say, that in real world you have to make
> decision before you take any action. There is no retroactivity possible.
>
> Reuven pointed out, that it is possible (although it seems a little weird
> to me, but that is the only thing I can tell against it :-)), that the way
> a grouped PCollection is produced might be out of control of a consuming
> operator. One example of this might be, that the grouping is produced in a
> submodule (some library), but still, the consumer wants to be able to
> specify if he wants or doesn't want reiterations. There still is a
> "classical" solution to this - the library might expose an interface to
> specify a factory for the grouped PCollection, so that the user of the
> library will be able to specify what he wants. But we can say, that we
> don't want to force users (or authors of libraries) to do that. That's okay
> for me.
>
> If we move on, our next option might be to specify the annotation on the
> consumer (as suggested), but that has all the "not really nice" properties
> of being counter-intuitive, ignoring strong types, etc., etc., for which
> reason I think that this should be ruled out as well.
>
> This leaves us with a single option (at least I have not figured out any
> other) - which is we can bundle GBK and associated ParDo into atomic
> PTransform, which can then be overridden by runners that need special
> handling of this situation - these are all runners that need buffer data to
> memory in order to support reiterations (spark and flink, note that this
> problem arises only for batch case, because in streaming case, one can
> reasonably assume that the data resides in a state that supports
> reiterations). But - we already have this PTransform in Euphoria, it is
> called ReduceByKey, and has all the required properties (technically, it is
> not a PTransform now, but that is a minor detail and can be changed
> trivially).
>
> So, the direction I was trying to take this discussion was - what could be
> the best way for a runner to natively support a PTransform from a DSL? I
> can imagine several options:
>
>  a) support it directly and let runners depend on the DSL (compileOnly
> dependency might suffice, because users will include the DSL into their
> code to be able to use it)
>
>  b) create an interface in runners for user-code to be able to provide
> translation for user-specified operators (this could be absolutely generic,
> DSLs might just use this feature the same way any user could), after all
> runners already use a concept of Translator, but that is pretty much
> copy-pasted, not abstracted into a general purpose one
>
>  c) move the operators that need to be translated into core
>
> The option (c) then leaves open questions related to - if we would want to
> move other operators to core, would this be the right time to ask questions
> if our current set of "core" operators is the ideal one? Or could this be
> optimized?
>
> Jan
> On 10/1/19 12:32 AM, Kenneth Knowles wrote:
>
> In the car analogy, you have something this:
>
> Iterable: car
> Iterator: taxi ride
>
> They are related, but not as variations of a common concept.
>
> In the discussion of Combine vs RSBK, if the reducer is required to be an
> associative and commutative operator, then it is the same thing under a
> different name. If the reducer can be non-associative or non-commutative,
> then it admits fewer transformations/optimizations.
>
> If you introduce a GroupIteratorsByKey and implement GroupByKey as a
> transform that combines the iterator by concatenation, I think you do get
> an internally consistent system. To execute efficiently, you need to always
> identify and replace the GroupByKey operation with a primitive one. It does
> make some sense to expose the weakest primitives for the sake of DSLs. But
> they are very poorly suited for end-users, and for GBK on most runners you
> get the more powerful one for free.
>
> Kenn
>
> On Mon, Sep 30, 2019 at 2:02 AM Jan Lukavský  wrote:
>
>> > The fact that the annotation on the ParDo "changes" the GroupByKey
>> implementation is very specific to the Spark runner implementation.
>>
>> I don't quite agree. It is not very specific to Spark, it is specific to
>> generally all runners, that produce grouped elements in a way that is not
>> reiterable. That is the key property. The 

Re: [VOTE] Sign a pledge to discontinue support of Python 2 in 2020.

2019-10-01 Thread Robert Bradshaw
The correct link is https://python3statement.org/

On Tue, Oct 1, 2019 at 10:14 AM Mark Liu  wrote:
>
> +1
>
> btw, the link (http://python3stament.org) you provided is broken.
>
> On Tue, Oct 1, 2019 at 9:44 AM Udi Meiri  wrote:
>>
>> +1
>>
>> On Tue, Oct 1, 2019 at 3:22 AM Łukasz Gajowy  wrote:
>>>
>>> +1
>>>
>>> wt., 1 paź 2019 o 11:29 Maximilian Michels  napisał(a):

 +1

 On 30.09.19 23:03, Reza Rokni wrote:
 > +1
 >
 > On Tue, 1 Oct 2019 at 13:54, Tanay Tummalapalli >>> > > wrote:
 >
 > +1
 >
 > On Tue, Oct 1, 2019 at 8:19 AM Suneel Marthi >>> > > wrote:
 >
 > +1
 >
 > On Mon, Sep 30, 2019 at 10:33 PM Manu Zhang
 > mailto:owenzhang1...@gmail.com>> wrote:
 >
 > +1
 >
 > On Tue, Oct 1, 2019 at 9:44 AM Austin Bennett
 > >>> > > wrote:
 >
 > +1
 >
 > On Mon, Sep 30, 2019 at 5:22 PM Valentyn Tymofieiev
 > mailto:valen...@google.com>> wrote:
 >
 > Hi everyone,
 >
 > Please vote whether to sign a pledge on behalf of
 > Apache Beam to sunset Beam Python 2 offering (in new
 > releases) in 2020 on http://python3stament.org as
 > follows:
 >
 > [ ] +1: Sign a pledge to discontinue support of
 > Python 2 in Beam in 2020.
 > [ ] -1: Do not sign a pledge to discontinue support
 > of Python 2 in Beam in 2020.
 >
 > The motivation and details for this vote were
 > discussed in [1, 2]. Please follow up in [2] if you
 > have any questions.
 >
 > This is a procedural vote [3] that will follow the
 > majority approval rules and will be open for at
 > least 72 hours.
 >
 > Thanks,
 > Valentyn
 >
 > [1]
 > 
 > https://lists.apache.org/thread.html/eba6caa58ea79a7ecbc8560d1c680a366b44c531d96ce5c699d41535@%3Cdev.beam.apache.org%3E
 > [2]
 > 
 > https://lists.apache.org/thread.html/456631fe1a696c537ef8ebfee42cd3ea8121bf7c639c52da5f7032e7@%3Cdev.beam.apache.org%3E
 > [3] https://www.apache.org/foundation/voting.html
 >
 >
 >
 > --
 >
 > This email may be confidential and privileged. If you received this
 > communication by mistake, please don't forward it to anyone else, please
 > erase all copies and attachments, and please let me know that it has
 > gone to the wrong person.
 >
 > The above terms reflect a potential business arrangement, are provided
 > solely as a basis for further discussion, and are not intended to be and
 > do not constitute a legally binding obligation. No legally binding
 > obligations will be created, implied, or inferred until an agreement in
 > final form is executed in writing by all parties involved.
 >


Re: [VOTE] Sign a pledge to discontinue support of Python 2 in 2020.

2019-09-30 Thread Robert Bradshaw
+1

On Mon, Sep 30, 2019 at 5:35 PM David Cavazos  wrote:
>
> +1
>
> On Mon, Sep 30, 2019 at 5:27 PM Ahmet Altay  wrote:
>>
>> +1
>>
>> On Mon, Sep 30, 2019 at 5:22 PM Valentyn Tymofieiev  
>> wrote:
>>>
>>> Hi everyone,
>>>
>>> Please vote whether to sign a pledge on behalf of Apache Beam to sunset 
>>> Beam Python 2 offering (in new releases) in 2020 on 
>>> http://python3stament.org as follows:
>>>
>>> [ ] +1: Sign a pledge to discontinue support of Python 2 in Beam in 2020.
>>> [ ] -1: Do not sign a pledge to discontinue support of Python 2 in Beam in 
>>> 2020.
>>>
>>> The motivation and details for this vote were discussed in [1, 2]. Please 
>>> follow up in [2] if you have any questions.
>>>
>>> This is a procedural vote [3] that will follow the majority approval rules 
>>> and will be open for at least 72 hours.
>>>
>>> Thanks,
>>> Valentyn
>>>
>>> [1] 
>>> https://lists.apache.org/thread.html/eba6caa58ea79a7ecbc8560d1c680a366b44c531d96ce5c699d41535@%3Cdev.beam.apache.org%3E
>>> [2] 
>>> https://lists.apache.org/thread.html/456631fe1a696c537ef8ebfee42cd3ea8121bf7c639c52da5f7032e7@%3Cdev.beam.apache.org%3E
>>> [3] https://www.apache.org/foundation/voting.html
>>>


Re: Why is there no standard boolean coder?

2019-09-27 Thread Robert Bradshaw
Yes, go ahead and do this (though for your usecase I'm hoping we'll be
able to switch to schemas soon).

On Fri, Sep 27, 2019 at 5:35 PM Chad Dombrova  wrote:
>
> Would BooleanCoder continue to fall into this category?  I was under the 
> impression we might make it a full fledge standard coder with this PR.
>
>
>
> On Fri, Sep 27, 2019 at 5:32 PM Brian Hulette  wrote:
>>
>> +1, thank you!
>>
>> Note In my Row Coder PR I added a new section for "Additional Standard 
>> Coders" - i.e. coders that have a URN, but aren't required for a new 
>> runner/sdk to implement the beam model: 
>> https://github.com/apache/beam/pull/9188/files#diff-f0d64c2cfc4583bfe2a7e5ee59818ae2R646
>>
>> I think this would belong there as well, assuming that is a distinction we 
>> want to make.
>>
>> On Fri, Sep 27, 2019 at 5:22 PM Thomas Weise  wrote:
>>>
>>> +1 for adding the coder
>>>
>>> Please also add a test here: 
>>> https://github.com/apache/beam/blob/master/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
>>>
>>>
>>> On Fri, Sep 27, 2019 at 5:17 PM Chad Dombrova  wrote:
>>>>
>>>> Are there any dissenting votes to making a BooleanCoder a standard 
>>>> (portable) coder?
>>>>
>>>> I'm happy to make a PR to implement a BooleanCoder in python (and to add 
>>>> the Java BooleanCoder to the ModelCoderRegistrar) if everyone agrees that 
>>>> this is useful.
>>>>
>>>> -chad
>>>>
>>>>
>>>> On Fri, Sep 27, 2019 at 3:32 PM Robert Bradshaw  
>>>> wrote:
>>>>>
>>>>> I think boolean is useful to have. What I'm more skeptical of is
>>>>> adding standard types for variations like UnsignedInteger16, etc. that
>>>>> don't have natural representations in all languages.
>>>>>
>>>>> On Fri, Sep 27, 2019 at 2:46 PM Brian Hulette  wrote:
>>>>> >
>>>>> > Some more context from an offline discussion I had with +Robert 
>>>>> > Bradshaw a while ago: We both agreed all of the coders listed in 
>>>>> > BEAM-7996 should be implemented in Python, but didn't come to a 
>>>>> > conclusion on whether or not they should actually be _standard_ coders, 
>>>>> > versus just being implicitly standard as part of row coder.
>>>>> >
>>>>> > On Fri, Sep 27, 2019 at 2:29 PM Kenneth Knowles  wrote:
>>>>> >>
>>>>> >> Yes, noted here: 
>>>>> >> https://github.com/apache/beam/pull/9188/files#diff-f0d64c2cfc4583bfe2a7e5ee59818ae2R678
>>>>> >>  and that links to https://issues.apache.org/jira/browse/BEAM-7996
>>>>> >>
>>>>> >> Kenn
>>>>> >>
>>>>> >> On Fri, Sep 27, 2019 at 12:57 PM Reuven Lax  wrote:
>>>>> >>>
>>>>> >>> Java has one, implemented as a byte coder. My guess is that nobody 
>>>>> >>> has gotten around to implementing it yet for portability.
>>>>> >>>
>>>>> >>> On Fri, Sep 27, 2019 at 12:44 PM Chad Dombrova  
>>>>> >>> wrote:
>>>>> >>>>
>>>>> >>>> Hi all,
>>>>> >>>> It seems a bit unfortunate that there isn’t a portable way to 
>>>>> >>>> serialize a boolean value.
>>>>> >>>>
>>>>> >>>> I’m working on porting my external PubsubIO PR over to use the 
>>>>> >>>> improved schema-based external transform API in python, but because 
>>>>> >>>> of this limitation I can’t use boolean values. For example, this 
>>>>> >>>> fails:
>>>>> >>>>
>>>>> >>>> ReadFromPubsubSchema = typing.NamedTuple(
>>>>> >>>> 'ReadFromPubsubSchema',
>>>>> >>>> [
>>>>> >>>> ('topic', typing.Optional[unicode]),
>>>>> >>>> ('subscription', typing.Optional[unicode]),
>>>>> >>>> ('id_label',  typing.Optional[unicode]),
>>>>> >>>> ('with_attributes', bool),
>>>>> >>>> ('timestamp_attribute',  typing.Optional[unicode]),
>>>>> >>>> ]
>>>>> >>>> )
>>>>> >>>>
>>>>> >>>> It fails because coders.get_coder(bool) returns the non-portable 
>>>>> >>>> pickle coder.
>>>>> >>>>
>>>>> >>>> In the short term I can hack something into the external transform 
>>>>> >>>> API to use varint coder for bools, but this kind of hacky approach 
>>>>> >>>> to portability won’t work in scenarios where round-tripping is 
>>>>> >>>> required without user intervention. In other words, in python it is 
>>>>> >>>> not uncommon to test if x is True, in which case the integer 1 would 
>>>>> >>>> fail this test. All of that is to say that a BooleanCoder would be a 
>>>>> >>>> convenient way to ensure the proper type is used everywhere.
>>>>> >>>>
>>>>> >>>> So, I was just wondering why it’s not there? Are there concerns over 
>>>>> >>>> whether booleans are universal enough to make part of the 
>>>>> >>>> portability standard?
>>>>> >>>>
>>>>> >>>> -chad


Re: Why is there no standard boolean coder?

2019-09-27 Thread Robert Bradshaw
I think boolean is useful to have. What I'm more skeptical of is
adding standard types for variations like UnsignedInteger16, etc. that
don't have natural representations in all languages.

On Fri, Sep 27, 2019 at 2:46 PM Brian Hulette  wrote:
>
> Some more context from an offline discussion I had with +Robert Bradshaw a 
> while ago: We both agreed all of the coders listed in BEAM-7996 should be 
> implemented in Python, but didn't come to a conclusion on whether or not they 
> should actually be _standard_ coders, versus just being implicitly standard 
> as part of row coder.
>
> On Fri, Sep 27, 2019 at 2:29 PM Kenneth Knowles  wrote:
>>
>> Yes, noted here: 
>> https://github.com/apache/beam/pull/9188/files#diff-f0d64c2cfc4583bfe2a7e5ee59818ae2R678
>>  and that links to https://issues.apache.org/jira/browse/BEAM-7996
>>
>> Kenn
>>
>> On Fri, Sep 27, 2019 at 12:57 PM Reuven Lax  wrote:
>>>
>>> Java has one, implemented as a byte coder. My guess is that nobody has 
>>> gotten around to implementing it yet for portability.
>>>
>>> On Fri, Sep 27, 2019 at 12:44 PM Chad Dombrova  wrote:
>>>>
>>>> Hi all,
>>>> It seems a bit unfortunate that there isn’t a portable way to serialize a 
>>>> boolean value.
>>>>
>>>> I’m working on porting my external PubsubIO PR over to use the improved 
>>>> schema-based external transform API in python, but because of this 
>>>> limitation I can’t use boolean values. For example, this fails:
>>>>
>>>> ReadFromPubsubSchema = typing.NamedTuple(
>>>> 'ReadFromPubsubSchema',
>>>> [
>>>> ('topic', typing.Optional[unicode]),
>>>> ('subscription', typing.Optional[unicode]),
>>>> ('id_label',  typing.Optional[unicode]),
>>>> ('with_attributes', bool),
>>>> ('timestamp_attribute',  typing.Optional[unicode]),
>>>> ]
>>>> )
>>>>
>>>> It fails because coders.get_coder(bool) returns the non-portable pickle 
>>>> coder.
>>>>
>>>> In the short term I can hack something into the external transform API to 
>>>> use varint coder for bools, but this kind of hacky approach to portability 
>>>> won’t work in scenarios where round-tripping is required without user 
>>>> intervention. In other words, in python it is not uncommon to test if x is 
>>>> True, in which case the integer 1 would fail this test. All of that is to 
>>>> say that a BooleanCoder would be a convenient way to ensure the proper 
>>>> type is used everywhere.
>>>>
>>>> So, I was just wondering why it’s not there? Are there concerns over 
>>>> whether booleans are universal enough to make part of the portability 
>>>> standard?
>>>>
>>>> -chad


Re: Collecting feedback for Beam usage

2019-09-26 Thread Robert Bradshaw
running direct runner unit tests, causing lots of traffic.
>> - I would not dismiss the possibility of spam and attacks.
>>
>> I'd recommend to start by listing the questions we're hoping to answer using 
>> the collected feedback, and then judging whether the proposed method indeed 
>> allows answering them while respecting the users' privacy.
>>
>> On Tue, Sep 24, 2019 at 1:49 PM Lukasz Cwik  wrote:
>>>
>>> One of the options could be to just display the URL and not to phone home. 
>>> I would like it so that users can integrate this into their deployment 
>>> solution so we get regular stats instead of only when a user decides to run 
>>> a pipeline manually.
>>>
>>> On Tue, Sep 24, 2019 at 11:13 AM Robert Bradshaw  
>>> wrote:
>>>>
>>>> I think the goal is to lower the barrier of entry. Displaying a URL to
>>>> click on while waiting for your pipeline to start up, that contains
>>>> all the data explicitly visible, is about as easy as it gets.
>>>> Remembering to run a new (probably not as authentic) pipeline with
>>>> that flag is less so.
>>>>
>>>> On Tue, Sep 24, 2019 at 11:04 AM Mikhail Gryzykhin  
>>>> wrote:
>>>> >
>>>> > I'm with Luke on this. We can add a set of flags to send home stats and 
>>>> > crash dumps if user agrees. If we keep code isolated, it will be easy 
>>>> > enough for user to check what is being sent.
>>>> >
>>>> > One more heavy-weight option is to also allow user configure and persist 
>>>> > what information he is ok with sharing.
>>>> >
>>>> > --Mikhail
>>>> >
>>>> >
>>>> > On Tue, Sep 24, 2019 at 10:02 AM Lukasz Cwik  wrote:
>>>> >>
>>>> >> Why not add a flag to the SDK that would do the phone home when 
>>>> >> specified?
>>>> >>
>>>> >> From a support perspective it would be useful to know:
>>>> >> * SDK version
>>>> >> * Runner
>>>> >> * SDK provided PTransforms that are used
>>>> >> * Features like user state/timers/side inputs/splittable dofns/...
>>>> >> * Graph complexity (# nodes, # branches, ...)
>>>> >> * Pipeline failed or succeeded
>>>> >>
>>>> >> On Mon, Sep 23, 2019 at 3:18 PM Robert Bradshaw  
>>>> >> wrote:
>>>> >>>
>>>> >>> On Mon, Sep 23, 2019 at 3:08 PM Brian Hulette  
>>>> >>> wrote:
>>>> >>> >
>>>> >>> > Would people actually click on that link though? I think Kyle has a 
>>>> >>> > point that in practice users would only find and click on that link 
>>>> >>> > when they're having some kind of issue, especially if the link has 
>>>> >>> > "feedback" in it.
>>>> >>>
>>>> >>> I think the idea is that we would make the link very light-weight,
>>>> >>> kind of like a survey (but even easier as it's pre-populated).
>>>> >>> Basically an opt-in phone-home. If we don't collect any personal data
>>>> >>> (not even IP/geo, just (say) version + runner, all visible in the
>>>> >>> URL), no need to guard/anonymize (and this may be sufficient--I don't
>>>> >>> think we have to worry about spammers and ballot stuffers given the
>>>> >>> target audience). If we can catch people while they wait for their
>>>> >>> pipeline to start up (and/or complete), this is a great time to get
>>>> >>> some feedback.
>>>> >>>
>>>> >>> > I agree usage data would be really valuable, but I'm not sure that 
>>>> >>> > this approach would get us good data. Is there a way to get download 
>>>> >>> > statistics for the different runner artifacts? Maybe that could be a 
>>>> >>> > better metric to compare usage.
>>>> >>>
>>>> >>> This'd be useful too, but hard to get and very noisy.
>>>> >>>
>>>> >>> >
>>>> >>> > On Mon, Sep 23, 2019 at 2:57 PM Ankur Goenka  
>>>> >>> > wrote:
>>>> >>> >>
>>>> >>> >> I agree, these are the questions that need to b

Re: Jenkins queue times steadily increasing for a few months now

2019-09-24 Thread Robert Bradshaw
Yeah, that's useful. I was asking about getting things at the jenkins
job level. E.g. are our PostCommits taking up all the time, or our
Precommits?

On Tue, Sep 24, 2019 at 1:23 PM Lukasz Cwik  wrote:
>
> We can get the per gradle task profile with the --profile flag: 
> https://jakewharton.com/static/files/trace/profile.html
> This information also appears within the build scans that are sent to Gradle.
>
> Integrating with either of these sources of information would allow us to 
> figure out whether its new tasks or old tasks taking longer.
>
> On Tue, Sep 24, 2019 at 12:23 PM Robert Bradshaw  wrote:
>>
>> Does anyone know how to gather stats on where the time is being spent?
>> Several times the idea of consolidating many of the (expensive)
>> validates runner integration tests into a single pipeline, and then
>> running things individually only if that fails, has come up. I think
>> that'd be a big win if indeed this is where our time is being spent.
>>
>> On Tue, Sep 24, 2019 at 12:13 PM Daniel Oliveira  
>> wrote:
>> >
>> > Those ideas all sound good. I especially agree with trying to reduce tests 
>> > first and then if we've done all we can there and latency is still too 
>> > high, it means we need more workers. Also in addition to reducing the 
>> > amount of tests, there's also running less important tests less 
>> > frequently, particularly when it comes to postcommits since many of those 
>> > are resource intensive. That would require people with good context around 
>> > what our many postcommits are used for.
>> >
>> > Another idea I thought of is trying to avoid running automated tests 
>> > outside of peak coding times. Ideally, during the times when we get the 
>> > greatest amounts of PRs (and therefore precommits) we shouldn't have any 
>> > postcommits running. If we have both pre and postcommits going at the same 
>> > time during peak hours, our queue times will shoot up even if the total 
>> > amount of work doesn't change much.
>> >
>> > Btw, you mentioned that this was a problem last year. Do you have any 
>> > links to discussions about that? It seems like it could be useful.
>> >
>> > On Thu, Sep 19, 2019 at 1:10 PM Mikhail Gryzykhin  
>> > wrote:
>> >>
>> >> Hi Daniel,
>> >>
>> >> Generally this looks feasible since jobs wait for new worker to be 
>> >> available to start.
>> >>
>> >> Over time we added more tests and did not deprecate enough, this 
>> >> increases load on workers. I wonder if we can add something like total 
>> >> runtime of all running jobs? This will be a safeguard metric that will 
>> >> show amount of time we actually run jobs. If it increases with same 
>> >> amount of workers, that will prove that we are overloading them (inverse 
>> >> is not necessarily correct).
>> >>
>> >> On addressing this, we can review approaches we took last year and see if 
>> >> any of them apply. If I do some brainstorming, following ideas come to 
>> >> mind: add more work force, reduce amount of tests, do better work on 
>> >> filtering out irrelevant tests, cancel irrelevant jobs (ie: cancel tests 
>> >> if linter fails) and/or add option for cancelling irrelevant jobs. One 
>> >> more big point can be effort on deflaking, but we seem to be decent in 
>> >> this area.
>> >>
>> >> Regards,
>> >> Mikhail.
>> >>
>> >>
>> >> On Thu, Sep 19, 2019 at 12:22 PM Daniel Oliveira  
>> >> wrote:
>> >>>
>> >>> Hi everyone,
>> >>>
>> >>> A little while ago I was taking a look at the Precommit Latency metrics 
>> >>> on Grafana (link) and saw that the monthly 90th percentile metric has 
>> >>> been really increasing the past few months, from around 10 minutes to 
>> >>> currently around 30 minutes.
>> >>>
>> >>> After doing some light digging I was shown this page (beam load 
>> >>> statistics) which seems to imply that queue times are shooting up when 
>> >>> all the test executors are occupied, and it seems this is happening 
>> >>> longer and more often recently. I also took a look at the commit history 
>> >>> for our Jenkins tests and I see that new tests have steadily been added.
>> >>>
>> >>> I wanted to bring this up with the dev@ to ask:
>> >>>
>> >>> 1. Is this accurate? Can anyone provide insight into the metrics? Does 
>> >>> anyone know how to double check my assumptions with more concrete 
>> >>> metrics?
>> >>>
>> >>> 2. Does anyone have ideas on how to address this?
>> >>>
>> >>> Thanks,
>> >>> Daniel Oliveira


Re: Jenkins queue times steadily increasing for a few months now

2019-09-24 Thread Robert Bradshaw
Does anyone know how to gather stats on where the time is being spent?
Several times the idea of consolidating many of the (expensive)
validates runner integration tests into a single pipeline, and then
running things individually only if that fails, has come up. I think
that'd be a big win if indeed this is where our time is being spent.

On Tue, Sep 24, 2019 at 12:13 PM Daniel Oliveira  wrote:
>
> Those ideas all sound good. I especially agree with trying to reduce tests 
> first and then if we've done all we can there and latency is still too high, 
> it means we need more workers. Also in addition to reducing the amount of 
> tests, there's also running less important tests less frequently, 
> particularly when it comes to postcommits since many of those are resource 
> intensive. That would require people with good context around what our many 
> postcommits are used for.
>
> Another idea I thought of is trying to avoid running automated tests outside 
> of peak coding times. Ideally, during the times when we get the greatest 
> amounts of PRs (and therefore precommits) we shouldn't have any postcommits 
> running. If we have both pre and postcommits going at the same time during 
> peak hours, our queue times will shoot up even if the total amount of work 
> doesn't change much.
>
> Btw, you mentioned that this was a problem last year. Do you have any links 
> to discussions about that? It seems like it could be useful.
>
> On Thu, Sep 19, 2019 at 1:10 PM Mikhail Gryzykhin  wrote:
>>
>> Hi Daniel,
>>
>> Generally this looks feasible since jobs wait for new worker to be available 
>> to start.
>>
>> Over time we added more tests and did not deprecate enough, this increases 
>> load on workers. I wonder if we can add something like total runtime of all 
>> running jobs? This will be a safeguard metric that will show amount of time 
>> we actually run jobs. If it increases with same amount of workers, that will 
>> prove that we are overloading them (inverse is not necessarily correct).
>>
>> On addressing this, we can review approaches we took last year and see if 
>> any of them apply. If I do some brainstorming, following ideas come to mind: 
>> add more work force, reduce amount of tests, do better work on filtering out 
>> irrelevant tests, cancel irrelevant jobs (ie: cancel tests if linter fails) 
>> and/or add option for cancelling irrelevant jobs. One more big point can be 
>> effort on deflaking, but we seem to be decent in this area.
>>
>> Regards,
>> Mikhail.
>>
>>
>> On Thu, Sep 19, 2019 at 12:22 PM Daniel Oliveira  
>> wrote:
>>>
>>> Hi everyone,
>>>
>>> A little while ago I was taking a look at the Precommit Latency metrics on 
>>> Grafana (link) and saw that the monthly 90th percentile metric has been 
>>> really increasing the past few months, from around 10 minutes to currently 
>>> around 30 minutes.
>>>
>>> After doing some light digging I was shown this page (beam load statistics) 
>>> which seems to imply that queue times are shooting up when all the test 
>>> executors are occupied, and it seems this is happening longer and more 
>>> often recently. I also took a look at the commit history for our Jenkins 
>>> tests and I see that new tests have steadily been added.
>>>
>>> I wanted to bring this up with the dev@ to ask:
>>>
>>> 1. Is this accurate? Can anyone provide insight into the metrics? Does 
>>> anyone know how to double check my assumptions with more concrete metrics?
>>>
>>> 2. Does anyone have ideas on how to address this?
>>>
>>> Thanks,
>>> Daniel Oliveira


Re: Collecting feedback for Beam usage

2019-09-23 Thread Robert Bradshaw
On Mon, Sep 23, 2019 at 3:08 PM Brian Hulette  wrote:
>
> Would people actually click on that link though? I think Kyle has a point 
> that in practice users would only find and click on that link when they're 
> having some kind of issue, especially if the link has "feedback" in it.

I think the idea is that we would make the link very light-weight,
kind of like a survey (but even easier as it's pre-populated).
Basically an opt-in phone-home. If we don't collect any personal data
(not even IP/geo, just (say) version + runner, all visible in the
URL), no need to guard/anonymize (and this may be sufficient--I don't
think we have to worry about spammers and ballot stuffers given the
target audience). If we can catch people while they wait for their
pipeline to start up (and/or complete), this is a great time to get
some feedback.

> I agree usage data would be really valuable, but I'm not sure that this 
> approach would get us good data. Is there a way to get download statistics 
> for the different runner artifacts? Maybe that could be a better metric to 
> compare usage.

This'd be useful too, but hard to get and very noisy.

>
> On Mon, Sep 23, 2019 at 2:57 PM Ankur Goenka  wrote:
>>
>> I agree, these are the questions that need to be answered.
>> The data can be anonymize and stored as public data in BigQuery or some 
>> other place.
>>
>> The intent is to get the usage statistics so that we can get to know what 
>> people are using Flink or Spark etc and not intended for discussion or a 
>> help channel.
>> I also think that we don't need to monitor this actively as it's more like a 
>> survey rather than active channel to get issues resolved.
>>
>> If we think its useful for the community then we come up with the solution 
>> as to how can we do this (similar to how we released the container images).
>>
>>
>>
>> On Fri, Sep 20, 2019 at 4:38 PM Kyle Weaver  wrote:
>>>
>>> There are some logistics that would need worked out. For example, Where 
>>> would the data go? Who would own it?
>>>
>>> Also, I'm not convinced we need yet another place to discuss Beam when we 
>>> already have discussed the challenge of simultaneously monitoring mailing 
>>> lists, Stack Overflow, Slack, etc. While "how do you use Beam" is certainly 
>>> an interesting question, and I'd be curious to know that >= X many people 
>>> use a certain runner, I'm not sure answers to these questions are as useful 
>>> for guiding the future of Beam as discussions on the dev/users lists, etc. 
>>> as the latter likely result in more depth/specific feedback.
>>>
>>> However, I do think it could be useful in general to include links directly 
>>> in the console output. For example, maybe something along the lines of "Oh 
>>> no, your Flink pipeline crashed! Check Jira/file a bug/ask the mailing 
>>> list."
>>>
>>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>>>
>>>
>>> On Fri, Sep 20, 2019 at 4:14 PM Ankur Goenka  wrote:

 Hi,

 At the moment we don't really have a good way to collect any usage 
 statistics for Apache Beam. Like runner used etc. As many of the users 
 don't really have a way to report their usecase.
 How about if we create a feedback page where users can add their pipeline 
 details and usecase.
 Also, we can start printing the link to this page when user launch the 
 pipeline in the command line.
 Example:
 $ python my_pipeline.py --runner DirectRunner --input /tmp/abc

 Starting pipeline
 Please use http://feedback.beam.org?args=runner=DirectRunner,input=/tmp/abc
 Pipeline started
 ..

 Using a link and not publishing the data automatically will give user 
 control over what they publish and what they don't. We can enhance the 
 text and usage further but the basic idea is to ask for user feeback at 
 each run of the pipeline.
 Let me know what you think.


 Thanks,
 Ankur


Re: Flink Runner logging FAILED_TO_UNCOMPRESS

2019-09-19 Thread Robert Bradshaw
On Thu, Sep 19, 2019 at 4:33 PM Maximilian Michels  wrote:
>
> > This is obviously less than ideal for the user... Should we "fix" the
> > Java SDK? Of is the long-terms solution here to have runners do this
> > rewrite?
>
> I think ideal would be that the Runner adds the Impulse override. That
> way also the Python SDK would not have to have separate code paths for
> Reads.

Or, rather, that the Runner adds the non-Impuls override (in Java and Python).

> On 19.09.19 11:46, Robert Bradshaw wrote:
> > On Thu, Sep 19, 2019 at 11:22 AM Maximilian Michels  wrote:
> >>
> >> The flag is insofar relevant to the PortableRunner because it affects
> >> the translation of the pipeline. Without the flag we will generate
> >> primitive Reads which are unsupported in portability. The workaround we
> >> have used so far is to check for the Runner (e.g. PortableRunner) during
> >> pipeline translation and then add it automatically.
> >>
> >> A search in the Java code base reveals 18 occurrences of the flag, all
> >> inside the Dataflow Runner. This is good because the Java SDK itself
> >> does not make use of it. In portable Java pipelines the pipeline author
> >> has to take care to override primitive reads with the JavaReadViaImpulse
> >> wrapper.
> >
> > This is obviously less than ideal for the user... Should we "fix" the
> > Java SDK? Of is the long-terms solution here to have runners do this
> > rewrite?
> >
> >> On the Python side the IO code uses the flag directly to either generate
> >> a primitive Read or a portable Impulse + ParDoReadAdapter.
> >>
> >> Would it be conceivable to remove the beam_fn_api flag and introduce a
> >> legacy flag which the Dataflow Runner could then use? With more runners
> >> implementing portability, I believe this would make sense.
> >>
> >> Thanks,
> >> Max
> >>
> >> On 18.09.19 18:29, Ahmet Altay wrote:
> >>> I believe the flag was never relevant for PortableRunner. I might be
> >>> wrong as well. The flag affects a few bits in the core code and that is
> >>> why the solution cannot be by just setting the flag in Dataflow runner.
> >>> It requires some amount of clean up. I agree that it would be good to
> >>> clean this up, and I also agree to not rush this especially if this is
> >>> not currently impacting users.
> >>>
> >>> Ahmet
> >>>
> >>> On Wed, Sep 18, 2019 at 12:56 PM Maximilian Michels  >>> <mailto:m...@apache.org>> wrote:
> >>>
> >>>   > I disagree that this flag is obsolete. It is still serving a
> >>>  purpose for batch users using dataflow runner and that is decent
> >>>  chunk of beam python users.
> >>>
> >>>  It is obsolete for the PortableRunner. If the Dataflow Runner needs
> >>>  this
> >>>  flag, couldn't we simply add it there? As far as I know Dataflow 
> >>> users
> >>>  do not use the PortableRunner. I might be wrong.
> >>>
> >>>  As Kyle mentioned, he already fixed the issue. The fix is only 
> >>> present
> >>>  in the 2.16.0 release though. This flag has repeatedly caused 
> >>> friction
> >>>  for users and that's why I want to get rid of it.
> >>>
> >>>  There is of course no need to rush this but it would be great to 
> >>> tackle
> >>>  this for the next release. Filed a JIRA:
> >>>  https://jira.apache.org/jira/browse/BEAM-8274
> >>>
> >>>  Cheers,
> >>>  Max
> >>>
> >>>  On 17.09.19 15:39, Kyle Weaver wrote:
> >>>   > Actually, the reported issues are already fixed on head. We're 
> >>> just
> >>>   > trying to prevent similar issues in the future.
> >>>   >
> >>>   > Kyle Weaver | Software Engineer | github.com/ibzib
> >>>  <http://github.com/ibzib>
> >>>   > <http://github.com/ibzib> | kcwea...@google.com
> >>>  <mailto:kcwea...@google.com> <mailto:kcwea...@google.com
> >>>  <mailto:kcwea...@google.com>>
> >>>   >
> >>>   >
> >>>   > On Tue, Sep 17, 2019 at 3:38 PM Ahmet Altay  >>>  <mailto:al...@google.com>
> >>>   > <mailto:al...@google.com <mailto:al...@googl

Re: Plan for dropping python 2 support

2019-09-19 Thread Robert Bradshaw
Oh, and one more thing, I think it'd make sense for Apache Beam to
sign https://python3statement.org/. The promise is that we'd
discontinue Python 2 support *in* 2020, which is not committing us to
January if we're not ready. Worth a vote?


On Thu, Sep 19, 2019 at 3:58 PM Robert Bradshaw  wrote:
>
> Exactly how long we support Python 2 depends on our users. Other than
> those that speak up (such as yourself, thanks!), it's hard to get a
> handle on how many need Python 2 and for how long. (Should we send out
> a survey? Maybe after some experience with 2.16?)
>
> On the one hand, the whole ecosystem is finally moving on, and even if
> Beam continues to support Python 2 our dependencies, or other projects
> that are being used in conjunction with Beam, will also be going
> Python 3 only. On the other hand, Beam is, admittedly, quite late to
> the party and could be the one holding people back, and looking at how
> long it took us, if we just barely make it by the end of the year it's
> unreasonable to say at that point "oh, and we're dropping 2.7 at the
> same time."
>
> The good news is that 2.16 is shaping up to be a release I would
> recommend everyone migrate to Python 3 on. The remaining issues are
> things like some issues with main sessions (which already has issues
> in Python 2) and not supporting keyword-only arguments (a new feature,
> not a regression). I would guess that even 2.15 is already good enough
> for most people, at least to kick the tires and running tests to start
> the effort.
>
> (I also agree with the sentiment that once we go 3.x only, it'll be
> likely harder to maintain a 2.x LTS... but the whole LTS thing is
> being discussed in another thread.)
>
> On Thu, Sep 19, 2019 at 2:44 PM Chad Dombrova  wrote:
> >
> > Hi all,
> > I had a read through this thread in the archives. It occurred before I 
> > joined the mailing list, so I hope that this email connects up with the 
> > thread properly for everyone.
> >
> > I'd like to respond to the following points:
> >
> >> I believe we are referring to two separate things with support:
> >> - Supporting existing releases for patches - I agree that we need to give
> >> users a long enough window to upgrade. Great if it happens with an LTS
> >> release. Even if it does not, I think it will be fair to offer patches on
> >> the last python 2 supporting release during some part of 2020 if that
> >> becomes necessary.
> >> - Making new releases with python 2 support - Each new Beam release with
> >> python 2 support will implicitly extend the lifetime of beam's python 2
> >> support. I do not think we need to extend this to beyond 2019. 2 releases
> >> (~ 3 months) after solid python 3 support will very likely put the last
> >> python 2 supporting release to last quarter of 2019 already.
> >
> >
> > With so many important features still under active development 
> > (portability, expansion, external IO transforms, schema coders) and new 
> > versions of executors tied to the Beam source, staying behind is not really 
> > an option for many of us, and with python3 support not yet fully completed, 
> > the window in which Beam is fully working for both python versions is 
> > rapidly approaching 2 months, and could ultimately be even less, depending 
> > on how long it takes to complete the dozen remaining issues in Jira, and 
> > whatever pops up thereafter.
> >
> >> The cost of maintaining Python 2.7 support is higher than 0. Some issues
> >> that come to mind:
> >> - Maintaining Py2.7 / Py 3+ compatibility of Beam codebase makes it
> >> difficult to use Python 3 syntax in Beam which may be necessary to support
> >> and test syntactic constructs introduced in Python 3.
> >> - Running additional test suites increases the load on test infrastructure
> >> and increases flakiness.
> >
> >
> > I would argue that the cost of maintaining a python2-only LTS version will 
> > be far greater than maintaining python2 support for a little while longer.  
> > Dropping support for python2 could mean a number of things from simply 
> > disabling the python2 tests, to removing 2-to-3 idioms in favor of 
> > python3-only constructs.  If what you have in mind is anything like the 
> > latter then the master branch will become quite divergent from the LTS 
> > release, and backporting changes will be not be as simple as cherry-picking 
> > commits.  All-in-all, I think it's a lose/lose for everyone -- users and 
> > developers, of which I am both -- to drop python2 support on such a short 
> > timeline.
> >
> &

Re: Plan for dropping python 2 support

2019-09-19 Thread Robert Bradshaw
Exactly how long we support Python 2 depends on our users. Other than
those that speak up (such as yourself, thanks!), it's hard to get a
handle on how many need Python 2 and for how long. (Should we send out
a survey? Maybe after some experience with 2.16?)

On the one hand, the whole ecosystem is finally moving on, and even if
Beam continues to support Python 2 our dependencies, or other projects
that are being used in conjunction with Beam, will also be going
Python 3 only. On the other hand, Beam is, admittedly, quite late to
the party and could be the one holding people back, and looking at how
long it took us, if we just barely make it by the end of the year it's
unreasonable to say at that point "oh, and we're dropping 2.7 at the
same time."

The good news is that 2.16 is shaping up to be a release I would
recommend everyone migrate to Python 3 on. The remaining issues are
things like some issues with main sessions (which already has issues
in Python 2) and not supporting keyword-only arguments (a new feature,
not a regression). I would guess that even 2.15 is already good enough
for most people, at least to kick the tires and running tests to start
the effort.

(I also agree with the sentiment that once we go 3.x only, it'll be
likely harder to maintain a 2.x LTS... but the whole LTS thing is
being discussed in another thread.)

On Thu, Sep 19, 2019 at 2:44 PM Chad Dombrova  wrote:
>
> Hi all,
> I had a read through this thread in the archives. It occurred before I joined 
> the mailing list, so I hope that this email connects up with the thread 
> properly for everyone.
>
> I'd like to respond to the following points:
>
>> I believe we are referring to two separate things with support:
>> - Supporting existing releases for patches - I agree that we need to give
>> users a long enough window to upgrade. Great if it happens with an LTS
>> release. Even if it does not, I think it will be fair to offer patches on
>> the last python 2 supporting release during some part of 2020 if that
>> becomes necessary.
>> - Making new releases with python 2 support - Each new Beam release with
>> python 2 support will implicitly extend the lifetime of beam's python 2
>> support. I do not think we need to extend this to beyond 2019. 2 releases
>> (~ 3 months) after solid python 3 support will very likely put the last
>> python 2 supporting release to last quarter of 2019 already.
>
>
> With so many important features still under active development (portability, 
> expansion, external IO transforms, schema coders) and new versions of 
> executors tied to the Beam source, staying behind is not really an option for 
> many of us, and with python3 support not yet fully completed, the window in 
> which Beam is fully working for both python versions is rapidly approaching 2 
> months, and could ultimately be even less, depending on how long it takes to 
> complete the dozen remaining issues in Jira, and whatever pops up thereafter.
>
>> The cost of maintaining Python 2.7 support is higher than 0. Some issues
>> that come to mind:
>> - Maintaining Py2.7 / Py 3+ compatibility of Beam codebase makes it
>> difficult to use Python 3 syntax in Beam which may be necessary to support
>> and test syntactic constructs introduced in Python 3.
>> - Running additional test suites increases the load on test infrastructure
>> and increases flakiness.
>
>
> I would argue that the cost of maintaining a python2-only LTS version will be 
> far greater than maintaining python2 support for a little while longer.  
> Dropping support for python2 could mean a number of things from simply 
> disabling the python2 tests, to removing 2-to-3 idioms in favor of 
> python3-only constructs.  If what you have in mind is anything like the 
> latter then the master branch will become quite divergent from the LTS 
> release, and backporting changes will be not be as simple as cherry-picking 
> commits.  All-in-all, I think it's a lose/lose for everyone -- users and 
> developers, of which I am both -- to drop python2 support on such a short 
> timeline.
>
> I'm an active contributor to this project and it will put me and the company 
> that I work for in a very bad position if you force us onto an LTS release in 
> early 2020.  I understand the appeal of moving to python3-only code and I 
> want to get there too, but I would hope that you give your users are much 
> time to transition their own code as the Beam project itself has taken.  I'm 
> not asking for a full 12 months to transition, but more than a couple will be 
> required.
>
> thanks,
> -chad
>
>
>
>


Re: Flink Runner logging FAILED_TO_UNCOMPRESS

2019-09-19 Thread Robert Bradshaw
ive
> > enough time
> >  > to decouple the flag from the core code. (With a quick search
> > I saw
> >  > two instances related to Read and Create.) Have time to test
> > changes
> >  > and then switch the default.
> >  >
> >  >
> >  > An isinstance check might be smarter, but does not get rid of
> >  > the root
> >  > of the problem.
> >  >
> >  >
> >  > I might be wrong, IIUC, it will temporarily resolve the reported
> >  > issues. Is this not accurate?
> >  >
> >  >
> >  > -Max
> >  >
> >  > On 17.09.19 14:20, Ahmet Altay wrote:
> >  >  > Could you make that change and see if it would have
> > addressed
> >  > the issue
> >  >  > here?
> >  >  >
> >  >  > On Tue, Sep 17, 2019 at 2:18 PM Kyle Weaver
> >  > mailto:kcwea...@google.com>
> > <mailto:kcwea...@google.com <mailto:kcwea...@google.com>>
> >  >  > <mailto:kcwea...@google.com
> > <mailto:kcwea...@google.com> <mailto:kcwea...@google.com
> > <mailto:kcwea...@google.com>>>> wrote:
> >  >  >
> >  >  > The flag is automatically set, but not in a smart
> > way. Taking
> >  >  > another look at the code, a more resilient fix
> > would be
> >  > to just
> >  >  > check if the runner isinstance of PortableRunner.
> >  >  >
> >  >  > Kyle Weaver | Software Engineer | github.com/ibzib
> > <http://github.com/ibzib>
> >  > <http://github.com/ibzib>
> >  >  > <http://github.com/ibzib> | kcwea...@google.com
> > <mailto:kcwea...@google.com>
> >  > <mailto:kcwea...@google.com <mailto:kcwea...@google.com>>
> >  >  > <mailto:kcwea...@google.com
> > <mailto:kcwea...@google.com> <mailto:kcwea...@google.com
> > <mailto:kcwea...@google.com>>>
> >  >  >
> >  >  >
> >  >  > On Tue, Sep 17, 2019 at 2:14 PM Ahmet Altay
> >  > mailto:al...@google.com>
> > <mailto:al...@google.com <mailto:al...@google.com>>
> >  >  > <mailto:al...@google.com <mailto:al...@google.com>
> > <mailto:al...@google.com <mailto:al...@google.com>>>> wrote:
> >  >  >
> >  >  > Is not this flag set automatically for the
> > portable
> >  > runner here
> >  >  > [1] ?
> >  >  >
> >  >  > [1]
> >  >  >
> >  >
> > 
> > https://github.com/apache/beam/blob/f0aa877b8703eed4143957b4cd212aa026238a6e/sdks/python/apache_beam/pipeline.py#L160
> >  >  >
> >  >  > On Tue, Sep 17, 2019 at 2:07 PM Robert Bradshaw
> >  >  >  > <mailto:rober...@google.com> <mailto:rober...@google.com
> > <mailto:rober...@google.com>>
> >  > <mailto:rober...@google.com <mailto:rober...@google.com>
> > <mailto:rober...@google.com <mailto:rober...@google.com>>>> wrote:
> >  >  >
> >  >  > On Tue, Sep 17, 2019 at 1:43 PM Thomas Weise
> >  > mailto:t...@apache.org>
> > <mailto:t...@apache.org <mailto:t...@apache.org>>
> >  >  > <mailto:t...@apache.org
> > <mailto:t...@apache.org> <mailto:t...@apache.org
> > <mailto:t...@apache.org>>>>
> >  > wrote:
> >  >  >  >
> >  >  >  > +1 for making --experiments=beam_fn_api
> > default.
> >  >  >  >
> >  >  >  > Can the Dataflow runner driver just
> > remove the
> >  > setting if
> >  >  > it is not compatible?
> >  >  >
> >  >

Re: Flink Runner logging FAILED_TO_UNCOMPRESS

2019-09-17 Thread Robert Bradshaw
On Tue, Sep 17, 2019 at 1:43 PM Thomas Weise  wrote:
>
> +1 for making --experiments=beam_fn_api default.
>
> Can the Dataflow runner driver just remove the setting if it is not 
> compatible?

The tricky bit would be undoing the differences in graph construction
due to this flag flip. But I would be in favor of changing the default
(probably just removing the flag) and moving the non-portability parts
into the dataflow runner itself. (It looks like the key differences
here are for the Create and Read transforms.)

> On Tue, Sep 17, 2019 at 11:33 AM Maximilian Michels  wrote:
>>
>> +dev
>>
>> The beam_fn_api flag and the way it is automatically set is error-prone.
>> Is there anything that prevents us from removing it? I understand that
>> some Runners, e.g. Dataflow Runner have two modes of executing Python
>> pipelines (legacy and portable), but at this point it seems clear that
>> the portability mode should be the default.
>>
>> Cheers,
>> Max
>>
>> On September 14, 2019 7:50:52 PM PDT, Yu Watanabe
>>  wrote:
>>
>> Kyle
>>
>> Thank you for the assistance.
>>
>> By specifying "experiments" in PipelineOptions ,
>> ==
>>  options = PipelineOptions([
>>"--runner=FlinkRunner",
>>"--flink_version=1.8",
>>"--flink_master_url=localhost:8081",
>>"--experiments=beam_fn_api"
>>])
>> ==
>>
>> I was able to submit the job successfully.
>>
>> [grpc-default-executor-0] INFO
>> org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job
>> BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
>> [grpc-default-executor-0] INFO
>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation -
>> Starting job invocation
>> BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
>> [flink-runner-job-invoker] INFO
>> org.apache.beam.runners.flink.FlinkPipelineRunner - Translating
>> pipeline to Flink program.
>> [flink-runner-job-invoker] INFO
>> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating
>> a Batch Execution Environment.
>> [flink-runner-job-invoker] INFO
>> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using
>> Flink Master URL localhost:8081.
>> [flink-runner-job-invoker] WARN
>> org.apache.beam.runners.flink.FlinkExecutionEnvironments - No
>> default parallelism could be found. Defaulting to parallelism 1.
>> Please set an explicit parallelism with --parallelism
>> [flink-runner-job-invoker] INFO
>> org.apache.flink.api.java.ExecutionEnvironment - The job has 0
>> registered types and 0 default Kryo serializers
>> [flink-runner-job-invoker] INFO
>> org.apache.flink.configuration.Configuration - Config uses fallback
>> configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
>> [flink-runner-job-invoker] INFO
>> org.apache.flink.runtime.rest.RestClient - Rest client endpoint started.
>> [flink-runner-job-invoker] INFO
>> org.apache.flink.client.program.rest.RestClusterClient - Submitting
>> job 4e055a8878dda3f564a7b7c84d48510d (detached: false).
>>
>> Thanks,
>> Yu Watanabe
>>
>> On Sun, Sep 15, 2019 at 3:01 AM Kyle Weaver > > wrote:
>>
>> Try adding "--experiments=beam_fn_api" to your pipeline options.
>> (This is a known issue with Beam 2.15 that will be fixed in 2.16.)
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib
>>  | kcwea...@google.com
>> 
>>
>>
>> On Sat, Sep 14, 2019 at 12:52 AM Yu Watanabe
>> mailto:yu.w.ten...@gmail.com>> wrote:
>>
>> Hello.
>>
>> I am trying to spin up the flink runner but looks like data
>> serialization is failing.
>> I would like to ask for help to get over with this error.
>>
>> 
>> 
>> [flink-runner-job-invoker] ERROR
>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation
>> - Error during job invocation
>> 
>> BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016.
>> java.lang.IllegalArgumentException: unable to deserialize
>> BoundedSource
>>  at
>> 
>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>>  at
>> 
>> org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94)
>>  at
>> 
>> 

Re: The state of external transforms in Beam

2019-09-16 Thread Robert Bradshaw
Thanks for bringing this up again. My thoughts on the open questions below.

On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova  wrote:
> That commit solves 2 problems:
>
> Adds the pubsub Java deps so that they’re available in our portable pipeline
> Makes the coder for the PubsubIO message-holder type, PubsubMessage, 
> available as a standard coder. This is required because both PubsubIO.Read 
> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage 
> objects, but only “standard” (i.e. portable) coders can be used, so we have 
> to hack it to make PubsubMessage appear as a standard coder.
>
> More details:
>
> There’s a similar magic commit required for Kafka external transforms
> The Jira issue for this problem is here: 
> https://jira.apache.org/jira/browse/BEAM-7870
> For problem #2 above there seems to be some consensus forming around using 
> Avro or schema/row coders to send compound types in a portable way. Here’s 
> the PR for making row coders portable
> https://github.com/apache/beam/pull/9188

+1. Note that this doesn't mean that the IO itself must produce rows;
part of the Schema work in Java is to make it easy to automatically
convert from various Java classes to schemas transparently, so this
same logic that would allow one to apply an SQL filter directly to a
Kafka/PubSub read would allow cross-language. Even if that doesn't
work, we need not uglify the Java API; we can have an
option/alternative transform that appends the convert-to-Row DoFn for
easier use by external (though the goal of the former work is to make
this step unnecissary).

> I don’t really have any ideas for problem #1

The crux of the issue here is that the jobs API was not designed with
cross-language in mind, and so the artifact API ties artifacts to jobs
rather than to environments. To solve this we need to augment the
notion of environment to allow the specification of additional
dependencies (e.g. jar files in this specific case, or better as
maven/pypi/... dependencies (with version ranges) such that
environment merging and dependency resolution can be sanely done), and
a way for the expansion service to provide such dependencies.

Max wrote up a summary of the prior discussions at
https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8

In the short term, one can build a custom docker image that has all
the requisite dependencies installed.

This touches on a related but separable issue that one may want to run
some of these transforms "natively" in the same process as the runner
(e.g. a Java IO in the Flink Java Runner) rather than via docker.
(Similarly with subprocess.) Exactly how that works with environment
specifications is also a bit TBD, but my proposal has been that these
are best viewed as runner-specific substitutions of standard
environments.

> So the portability expansion system works, and now it’s time to sand off some 
> of the rough corners. I’d love to hear others’ thoughts on how to resolve 
> some of these remaining issues.

+1


On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova  wrote:
>
> Hi all,
> There was some interest in this topic at the Beam Summit this week (btw, 
> great job to everyone involved!), so I thought I’d try to summarize the 
> current state of things.
> First, let me explain the idea behind an external transforms for the 
> uninitiated.
>
> Problem:
>
> there’s a transform that you want to use, but it’s not available in your 
> desired language. IO connectors are a good example: there are many available 
> in the Java SDK, but not so much in Python or Go.
>
> Solution:
>
> Create a stub transform in your desired language (e.g. Python) whose primary 
> role is to serialize the parameters passed to that transform
> When you run your portable pipeline, just prior to it being sent to the Job 
> Service for execution, your stub transform’s payload is first sent to the 
> “Expansion Service” that’s running in the native language (Java), where the 
> payload is used to construct an instance of the native transform, which is 
> then expanded and converted to a protobuf and sent back to the calling 
> process (Python).
> The protobuf representation of the expanded transform gets integrated back 
> into the pipeline that you’re submitting
> Steps 2-3 are repeated for each external transform in your pipeline
> Then the whole pipeline gets sent to the Job Service to be invoked on 
> Flink/Spark/etc
>
> 
>
> Now on to my journey to get PubsubIO working in python on Flink.
>
> The first issue I encountered was that there was a lot of boilerplate 
> involved in serializing the stub python transform’s parameters so they can be 
> sent to the expansion service.
>
> I created a PR to make this simpler, which has just been merged to master: 
> https://github.com/apache/beam/pull/9098
>
> With this feature in place, if you’re using python 3.7 you can use a 
> dataclass and the typing module to 

Re: How do you write portable runner pipeline on separate python code ?

2019-09-13 Thread Robert Bradshaw
Note that loopback won't fix the problem for, say, cross-language IOs. But,
yes, it's really handy and should probably be used more.

On Fri, Sep 13, 2019 at 8:29 AM Lukasz Cwik  wrote:

> And/or update the wiki/website with some how to's...
>
> On Fri, Sep 13, 2019 at 7:51 AM Thomas Weise  wrote:
>
>> I agree that loopback would be preferable for this purpose. I just wasn't
>> aware this even works with the portable Flink runner. Is it one of the best
>> guarded secrets? ;-)
>>
>> Kyle, can you please post the pipeline options you would use for Flink?
>>
>>
>> On Thu, Sep 12, 2019 at 5:57 PM Kyle Weaver  wrote:
>>
>>> I prefer loopback because a) it writes output files to the local
>>> filesystem, as the user expects, and b) you don't have to pull or build
>>> docker images, or even have docker installed on your system -- which is one
>>> less point of failure.
>>>
>>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>>>
>>>
>>> On Thu, Sep 12, 2019 at 5:48 PM Thomas Weise  wrote:
>>>
>>>> This should become much better with 2.16 when we have the Docker images
>>>> prebuilt.
>>>>
>>>> Docker is probably still the best option for Python on a JVM based
>>>> runner in a local environment that does not have a development setup.
>>>>
>>>>
>>>> On Thu, Sep 12, 2019 at 1:09 PM Kyle Weaver 
>>>> wrote:
>>>>
>>>>> +dev  I think we should probably point new users
>>>>> of the portable Flink/Spark runners to use loopback or some other
>>>>> non-docker environment, as Docker adds some operational complexity that
>>>>> isn't really needed to run a word count example. For example, Yu's 
>>>>> pipeline
>>>>> errored here because the expected Docker container wasn't built before
>>>>> running.
>>>>>
>>>>> Kyle Weaver | Software Engineer | github.com/ibzib |
>>>>> kcwea...@google.com
>>>>>
>>>>>
>>>>> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw 
>>>>> wrote:
>>>>>
>>>>>> On this note, making local files easy to read is something we'd
>>>>>> definitely like to improve, as the current behavior is quite surprising.
>>>>>> This could be useful not just for running with docker and the portable
>>>>>> runner locally, but more generally when running on a distributed system
>>>>>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if 
>>>>>> we
>>>>>> could automatically stage local files to be read as artifacts that could 
>>>>>> be
>>>>>> consumed by any worker (possibly via external directory mounting in the
>>>>>> local docker case rather than an actual copy), and conversely copy small
>>>>>> outputs back to the local machine (with the similar optimization for 
>>>>>> local
>>>>>> docker).
>>>>>>
>>>>>> At the very least, however, obvious messaging when the local
>>>>>> filesystem is used from within docker, which is often a (non-obvious and
>>>>>> hard to debug) mistake should be added.
>>>>>>
>>>>>>
>>>>>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik 
>>>>>> wrote:
>>>>>>
>>>>>>> When you use a local filesystem path and a docker environment,
>>>>>>> "/tmp" is written inside the container. You can solve this issue by:
>>>>>>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>>>>>>> * Mounting an external directory into the container so that any
>>>>>>> "local" writes appear outside the container
>>>>>>> * Using a non-docker environment such as external or process.
>>>>>>>
>>>>>>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello.
>>>>>>>>
>>>>>>>> I would like to ask for help with my sample code using portable
>>>>>>>> runner using apache flink.
>>>>>>>> I was able to work out the wordcount.py using this page.
>>>>>>>>
>>>>>>>> https://beam.apache.or

Re: [discuss] How we support our users on Slack / Mailing list / StackOverflow

2019-09-06 Thread Robert Bradshaw
I would also suggest SO as the best alternative, especially due to its
indexability and searchability. If discussion is needed, the users
list (my preference) or slack can be good options, and ideally the
resolution is brought back to SO.

On Fri, Sep 6, 2019 at 1:10 PM Udi Meiri  wrote:
>
> I don't go on Slack, but I will be notified of mentions. It has the advantage 
> of being an informal space.
> SO can feel just as intimidating as the mailing list IMO. Unlike the others, 
> it doesn't lend itself very well to discussions (you can only post comments 
> or answers).
>
>
>
> On Fri, Sep 6, 2019 at 10:55 AM Pablo Estrada  wrote:
>>
>> Hello all,
>>
>> THE SITUATION:
>> It was brought to my attention recently that Python users in Slack are not 
>> getting much support, because most of the Beam Python-knowledgeable people 
>> are not on Slack. Unfortunately, in the Beam site, we do refer people to 
>> Slack for assistance[1].
>>
>> Java users do receive reasonable support, because there are enough Beam 
>> Java-knowledgeable people online, and willing to answer.
>>
>> On the other hand, at Google we do have a number of people who are 
>> responsible to answer questions on StackOverflow[2], and we do our best to 
>> answer promptly. I think we do a reasonable job overall.
>>
>> SO LET'S DISCUSS:
>> How should we advise the community to ask questions about Beam?
>> - Perhaps we should encourage people to try the mailing list first
>> - Perhaps we should encourage people to try StackOverflow first
>> - Perhaps we should write a bot that encourages Python users to go to 
>> StackOverflow
>> - something else?
>>
>> My personal opinion is that a mailing list is not great: It's intimidating, 
>> it does not provide great indexing or searchability.
>>
>> WHAT I PROPOSE:
>>
>> I think explicitly encouraging everyone to go to StackOverflow first will be 
>> the best alternative: It's indexed, searchable, less intimidating than the 
>> mailing list. We can add that they can try Slack as well - without any 
>> guarantees.
>>
>> What do others think?
>> -P.
>>
>> [1] https://beam.apache.org/community/contact-us/
>> [2] https://stackoverflow.com/questions/tagged/apache-beam?tab=Newest


Re: Stop publishing unneeded Java artifacts

2019-09-03 Thread Robert Bradshaw
:sdks:java:testing:expansion-service could be useful to publish for
testing as well.

On Fri, Aug 30, 2019 at 3:13 PM Lukasz Cwik  wrote:
>
> Google internally relies on being able to get the POM files generated for:
> :sdks:java:testing:nexmark
> :sdks:java:testing:test-utils
>
> Generating the POM files currently relies on publishing being enabled for 
> those projects so could we keep publishing those two modules. Disabling the 
> others sounds fine to me.
>
> On Thu, Aug 29, 2019 at 9:41 PM Lukasz Cwik  wrote:
>>
>> I wanted to double check that we don't rely on this publishing inside Google 
>> for some reason. Will update this thread tomorrow.
>>
>> On Wed, Aug 28, 2019 at 7:11 AM Łukasz Gajowy  wrote:
>>>
>>> Hi all,
>>>
>>> I wanted to notify that in PR 9417 I'm planning to turn off publishing of 
>>> the following modules' artifacts to the maven repository:
>>>
>>> :runners:google-cloud-dataflow-java:worker:windmill
>>> :sdks:java:build-tools
>>> :sdks:java:javadoc
>>> :sdks:java:testing:expansion-service
>>> :sdks:java:io:bigquery-io-perf-tests
>>> :sdks:java:io:file-based-io-tests
>>> :sdks:java:io:elasticsearch-tests:elasticsearch-tests-2
>>> :sdks:java:io:elasticsearch-tests:elasticsearch-tests-5
>>> :sdks:java:io:elasticsearch-tests:elasticsearch-tests-6
>>> :sdks:java:io:elasticsearch-tests:elasticsearch-tests-common
>>> :sdks:java:testing:load-tests
>>> :sdks:java:testing:nexmark
>>> :sdks:java:testing:test-utils
>>>
>>> AFAIK, the purpose of these modules is to keep related 
>>> tests/test-utils/utils together. We are not expecting users to make use of 
>>> such artifacts. Please let me know if you have any objections. If there are 
>>> none and the PR gets merged, the artifacts will no longer be published.
>>>
>>> Thanks!
>>> Łukasz


Re: Write-through-cache in State logic

2019-08-27 Thread Robert Bradshaw
Just to clarify, the repeated list of cache tokens in the process
bundle request is used to validate reading *and* stored when writing?
In that sense, should they just be called version identifiers or
something like that?

On Tue, Aug 27, 2019 at 11:33 AM Maximilian Michels  wrote:
>
> Thanks. Updated:
>
> message ProcessBundleRequest {
>   // (Required) A reference to the process bundle descriptor that must be
>   // instantiated and executed by the SDK harness.
>   string process_bundle_descriptor_reference = 1;
>
>   // A cache token which can be used by an SDK to check for the validity
>   // of cached elements which have a cache token associated.
>   message CacheToken {
>
> // A flag to indicate a cache token is valid for user state.
> message UserState {}
>
> // A flag to indicate a cache token is valid for a side input.
> message SideInput {
>   // The id of a side input.
>   string side_input = 1;
> }
>
> // The scope of a cache token.
> oneof type {
>   UserState user_state = 1;
>   SideInput side_input = 2;
> }
>
> // The cache token identifier which should be globally unique.
> bytes token = 10;
>   }
>
>   // (Optional) A list of cache tokens that can be used by an SDK to reuse
>   // cached data returned by the State API across multiple bundles.
>   repeated CacheToken cache_tokens = 2;
> }
>
> On 27.08.19 19:22, Lukasz Cwik wrote:
>
> SideInputState -> SideInput (side_input_state -> side_input)
> + more comments around the messages and the fields.
>
>
> On Tue, Aug 27, 2019 at 10:18 AM Maximilian Michels  wrote:
>>
>> We would have to differentiate cache tokens for user state and side inputs. 
>> How about something like this?
>>
>> message ProcessBundleRequest {
>>   // (Required) A reference to the process bundle descriptor that must be
>>   // instantiated and executed by the SDK harness.
>>   string process_bundle_descriptor_reference = 1;
>>
>>   message CacheToken {
>>
>> message UserState {
>> }
>>
>> message SideInputState {
>>   string side_input_id = 1;
>> }
>>
>> oneof type {
>>   UserState user_state = 1;
>>   SideInputState side_input_state = 2;
>> }
>>
>> bytes token = 10;
>>   }
>>
>>   // (Optional) A list of cache tokens that can be used by an SDK to reuse
>>   // cached data returned by the State API across multiple bundles.
>>   repeated CacheToken cache_tokens = 2;
>> }
>>
>> -Max
>>
>> On 27.08.19 18:43, Lukasz Cwik wrote:
>>
>> The bundles view of side inputs should never change during processing and 
>> should have a point in time snapshot.
>>
>> I was just trying to say that the cache token for side inputs being deferred 
>> till side input request time simplified the runners implementation since 
>> that is conclusively when the runner would need to take a look at the side 
>> input. Putting them as part of the ProcesBundleRequest complicates that but 
>> does make the SDK implementation significantly simpler which is a win.
>>
>> On Tue, Aug 27, 2019 at 9:14 AM Maximilian Michels  wrote:
>>>
>>> Thanks for the quick response.
>>>
>>> Just to clarify, the issue with versioning side input is also present
>>> when supplying the cache tokens on a request basis instead of per
>>> bundle. The SDK never knows when the Runner receives a new version of
>>> the side input. Like you pointed out, it needs to mark side inputs as
>>> stale and generate new cache tokens for the stale side inputs.
>>>
>>> The difference between per-request tokens and per-bundle tokens would be
>>> that the side input can only change after a bundle completes vs. during
>>> the bundle. Side inputs are always fuzzy in that regard because there is
>>> no precise instance where side inputs are atomically updated, other than
>>> the assumption that they eventually will be updated. In that regard
>>> per-bundle tokens for side input seem to be fine.
>>>
>>> All of the above is not an issue for user state, as its cache can remain
>>> valid for the lifetime of a Runner<=>SDK Harness connection. A simple
>>> solution would be to not cache side input because there are many cases
>>> where the caching just adds additional overhead. However, I can also
>>> imagine cases where side input is valid forever and caching would be
>>> very beneficial.
>>>
>>> For the first version I want to focus on user state because that's where
>>> I see the most benefit for caching. I don't see a problem though for the
>>> Runner to detect new side input and reflect that in the cache tokens
>>> supplied for a new bundle.
>>>
>>> -Max
>>>
>>> On 26.08.19 22:27, Lukasz Cwik wrote:
>>> > Your summary below makes sense to me. I can see that recovery from
>>> > rolling back doesn't need to be a priority and simplifies the solution
>>> > for user state caching down to one token.
>>> >
>>> > Providing cache tokens upfront does require the Runner to know what
>>> > "version" of everything it may supply to the SDK upfront (instead of on
>>> > 

Re: Write-through-cache in State logic

2019-08-27 Thread Robert Bradshaw
On Sun, Aug 18, 2019 at 7:30 PM Rakesh Kumar  wrote:
>
> not to completely hijack Max's question but a tangential question regarding 
> LRU cache.
>
> What is the preferred python library for LRU cache?
> I noticed that cachetools [1] is used as one of the dependencies for GCP [2]. 
> Cachetools[1] has LRU cache and it supports Python 2 & 3. It can potentially 
> support our use case.  Can we move cachetools to the required pacakge list 
> [3] and use it for cross bundle caching?
>
> 1. https://pypi.org/project/cachetools/
> 2. 
> https://github.com/apache/beam/blob/96abacba9b8c7475c753eb3c0b58cca27c46feb1/sdks/python/setup.py#L143
> 3. 
> https://github.com/apache/beam/blob/96abacba9b8c7475c753eb3c0b58cca27c46feb1/sdks/python/setup.py#L104

cachetools sounds like a fine choice to me.


[ANNOUNCE] New committer: Valentyn Tymofieiev

2019-08-26 Thread Robert Bradshaw
Hi,

Please join me and the rest of the Beam PMC in welcoming a new
committer: Valentyn Tymofieiev

Valentyn has made numerous contributions to Beam over the last several
years (including 100+ pull requests), most recently pushing through
the effort to make Beam compatible with Python 3. He is also an active
participant in design discussions on the list, participates in release
candidate validation, and proactively helps keep our tests green.

In consideration of Valentyn's contributions, the Beam PMC trusts him
with the responsibilities of a Beam committer [1].

Thank you, Valentyn, for your contributions and looking forward to many more!

Robert, on behalf of the Apache Beam PMC

[1] 
https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer


Re: Brief of interactive Beam

2019-08-26 Thread Robert Bradshaw
On Fri, Aug 23, 2019 at 4:25 PM Ning Kang  wrote:

> On Aug 23, 2019, at 3:09 PM, Robert Bradshaw  wrote:
>
> Cool, sounds like we're getting closer to the same page. Some more replies
> below.
>
> On Fri, Aug 23, 2019 at 1:47 PM Ning Kang  wrote:
>
>> Thanks for the feedback, Robert! I think I got your idea.
>> Let me summarize it to see if it’s correct:
>> 1. You want everything about
>>
>> standard Beam concepts
>>
>>  to follow existing pattern: so we can shot down create_pipeline() and
>> keep the InteractiveRunner notion when constructing pipeline, I agree with
>> it. A runner can delegate another runner, also agreed. Let’s keep it that
>> way.
>>
>
> Despite everything I've written, I'm not convinced that exposing this as a
> Runner is the most intuitive way to get interactivity either. Given that
> the "magic" of interactivity is being able to watch PCollections (for
> inspection and further construction), and if no PCollecitons are watched
> execution proceeds as normal, what are your thoughts about making all
> pipelines "interactive" and just doing the magic iff there are PCollections
> to watch? (The opt-in incantation here would be ibeam.watch(globals()) or
> similar.)
>
> FWIW, Flume has something similar (called marking collections as to be
> materialized). It has its pros and cons.
>
> By default __main__ is watched, similar to the watch(globals()). If no
> PCollection variable is being watched, it’s not doing any magic.
> I’m not sure about making all pipelines “interactive” such as by adding an
> “interactive=True/False” option when constructing pipeline.
>

My point was that watch(globals()) (or anything else) would be the explicit
op in to interactive, instead of doing interactive=True or manually
constructing an InteractiveRunner or anything else.


> Since we couldn’t decide which one is more intuitive, I would stick to the
> existing InteractiveRunner constructor that is open sourced.
> And we try to avoid changing any code outside …/runners/interactive/.
>
> Yes, we can stick with what's already there for now to avoid blocking any
implementation work.

> 2. watch() and visualize() can be in the independent interactive beam
>> module since they are
>>
>> concepts that are unique to being interactive
>>
>> 3. I'll add some example for the run_pipeline() in design doc. The short
>> answer is run_pipeline() != p.run(). Thanks for sharing the doc (
>> https://s.apache.org/no-beam-pipeline).
>> As described in the doc, when constructing the pipeline, we still want to
>> bundle a runner and options to the constructed pipeline even in the future.
>> So if the runner is InteractiveRunner, the interactivity instrument
>> (implicitly applied read/write cache PTransform and input/output wiring) is
>> only applied when "run_pipeline()" of the runner implementation is invoked.
>> p.run() will apply the instrument. However, this static function
>> run_pipeline() takes in a new runner and options,
>> invoking “run_pipeline()” implementation of the new runner and wouldn’t
>> have the instrument, thus no interactivity.
>> Because you cannot (don’t want to, as seen in the doc, users cannot
>> access the bundled pipeline/options in the future) change the runner easily
>> without re-executing all the notebook cells, this shorthand function allows
>> a user to run pipeline without interactivity immediately anywhere in a
>> notebook. In the meantime, the pipeline is still bundled with the original
>> Interactive Runner. The users can keep developing further pipelines.
>> The usage of this function is not intuitive until you put it in a
>> notebook user scenario where users develop, test in prod-like env and
>> develop further. And it’s equivalent to users writing
>> "from_runner_api(to_runner_api(pipeline))” in their notebook. It’s just a
>> shorthand.
>>
>
> What you're trying to work around here is the flaw in the existing API
> that a user binds the choice of Runner before pipeline construction, rather
> than at the point of execution. I propose we look at fixing this in Beam
> itself.
>
> Then I would propose not exposing this. If late runner binding is
> supported, we wouldn’t even need this. We can write it in an example
> notebook rather than exposing it.
>

Sounds good.


> 4. And we both agree that implicit cache is palatable and should be the
>> only thing we use to support interactivity. Cache and watched pipeline
>> definition (which tells us what to cache) are the main “hidden state” I
>> meant. Because the cache mechanism is totally implicit and hidden from the
>> use

Re: Python question about save_main_session

2019-08-23 Thread Robert Bradshaw
I suggest re-writing the test to avoid save_main_session.

On Fri, Aug 23, 2019 at 11:57 AM Udi Meiri  wrote:

> Hi,
> I'm trying to get pytest with the xdist plugin to run Beam tests. The
> issue is with save_main_session and a dependency of pytest-xdist called
> execnet, which triggers this error:
>
> *apache_beam/examples/complete/tfidf.py*:212: in run*output | 'write' >> 
> WriteToText(known_args.output)**apache_beam/pipeline.py*:426: in __exit__*
> self.run().wait_until_finish()**apache_beam/pipeline.py*:406: in run*
> self._options).run(False)**apache_beam/pipeline.py*:416: in run*
> pickler.dump_session(os.path.join(tmpdir, 
> 'main_session.pickle'))**apache_beam/internal/pickler.py*:282: in 
> dump_session*
> dill.load_session(file_path)**../../../../virtualenvs/beam-py35/lib/python3.5/site-packages/dill/_dill.py*:410:
>  in load_session*module = 
> unpickler.load()**../../../../virtualenvs/beam-py35/lib/python3.5/site-packages/execnet/gateway_base.py*:130:
>  in __getattr__*locs = 
> self._importdef.get(name)**../../../../virtualenvs/beam-py35/lib/python3.5/site-packages/execnet/gateway_base.py*:130:
>  in __getattr__*locs = 
> self._importdef.get(name)**../../../../virtualenvs/beam-py35/lib/python3.5/site-packages/execnet/gateway_base.py*:130:
>  in __getattr__*locs = self._importdef.get(name)**E   RecursionError: 
> maximum recursion depth exceeded*
> !!! Recursion detected (same locals & position)
>
>
> Does anyone on this list have experience with these kinds of errors? Any
> workarounds I can use? (can we handle this module specially / can we
> exclude it from main session?)
>


Re: Brief of interactive Beam

2019-08-23 Thread Robert Bradshaw
nderlying runners.”
>
>The caching mechanism / the magic that helps the interactivity
> instrumenting process might need different implementation for different
> underlying runners. Because the runner can be anywhere deployed in any
> architecture, the notebook is just a process on a machine. They need to
> work together.
>Currently, we have the local file based cache. If we run a pipeline
> with underlying_runner as DataflowRunner, we’ll need something like GCS
> based cache. An in-memory cache might be runner agnostic, but it might
> explode with big data source.
>

Yep, we need filesystem/directory to use as a cache. We have an existing
temp_location flag that we can use for this (and is required for
distributed runners). If unset we can default to a local temp dir (which
works for the direct runner).


>Existing InteractiveRunner has the following portability
> <https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive#portability>.
> That’s why I said the interactivity (implementation) needs to be tailored
> for different underlying runners.
>If we allow users to pass in all kinds of underlying runners (even
> their in-house ones), we have to support the interactivity for all of them
> which we probably don't. That’s why we wanted a create_pipeline() wrapper
> so that in notebook, when building a pipeline, bundle to DirectRunner by
> default.
>The focus on the Direct Runner is also related to our objective: we
> want to provide easy-to-use notebook and some notebook environment where
> users can interactively execute pipelines without worrying about setup
> (especially when the setup is not Beam but Interactive Beam related).
> 6. We don’t fix typo for user defined transforms
>
> I'm talking about pruning like having a cell with
>
> pcoll | beam.Map(lambda x: expression_with_typo)
>
> and then fixing it (and re-evaluating) with
>
> pcoll | beam.Map(lambda x: expression_with_typo_fixed)
>
> where the former Map would *always* fail and never get removed from the
> pipeline.
>
> We never change the pipeline defined by the user. Interactivity is applied
> to a copy of user defined pipeline.
>

Sure. But does the executed (copy) of the pipeline contain the bad Map
operation in it? If so, it in essence "poisons" the entire pipeline,
forcing a user to re-create and re-define it from the start to make forward
progress (which results in quite a poor user experience--the errors in cell
N manifest in cell M, but worse fixing and re-executing cell N doesn't fix
cell M). If not, how is it intelligently excluded (and in a way that is not
too dis-similar from non-interactive mode, and doesn't cause surprises with
the p.run vs. run_pipeline difference)?


> 7.
>
> One approach was that the pipeline construction is re-executed every time
> (i.e the "pipeline" object to run is really a callback, like a callable or
> a PTransform) and then there's no ambiguity here.
>
> I didn’t quite get it. Pipeline construction only happens when a user
> executes a cell with the pipeline construction code.
> Are you suggesting changing the logic in pipeline.apply() to always
> reapply/replace a NamedPTransform? I don’t think we (Interactive Beam) can
> decide that because it changes the behavior of Beam.
> We had some thought of subclassing pipeline and use the create_pipeline()
> method to create the subclassed pipeline object. Then intercept the
> pipeline.apply() to always replace PTransform with existing full label and
> apply logic of parent pipeline’s apply() logic.
> It seems to be a no go to me now.
>

Sorry I wasn't clear. I'm referring to the style in the above doc about
getting rid of the Pipeline object (as a long-lived thing at least). In
this case the actual execution of pipeline construction never spans
multiple cells (though its implementation might via function calls) so one
never has out-of-date transforms dangling off the pipeline object.


> This has the downsides of recreating the PCollectiion objects which are
> being used as handles (though perhaps they could be re-identified).
>
> If a user re-executes a cell with PCollection = p | PTransform, the
> PCollection object will be a new instance. That is not a downside.
> We can keep the existing behavior of Beam to always raise an error when
> the cell with named PTransform is re-executed.
>
> Thanks!
>
> Ning.
>
>
>
> On Aug 23, 2019, at 11:36 AM, Robert Bradshaw  wrote:
>
> On Wed, Aug 21, 2019 at 3:33 PM GMAIL  wrote:
>
>> Thanks for the input, Robert!
>>
>> On Aug 21, 2019, at 11:49 AM, Robert Bradshaw 
>> wrote:
>>
>> On Wed, Aug 14, 2019 at 11:29 AM Ning Kang  wrote:
>>
>>> Ahmet, thanks for forwarding

Re: Brief of interactive Beam

2019-08-23 Thread Robert Bradshaw
On Wed, Aug 21, 2019 at 3:33 PM GMAIL  wrote:

> Thanks for the input, Robert!
>
> On Aug 21, 2019, at 11:49 AM, Robert Bradshaw  wrote:
>
> On Wed, Aug 14, 2019 at 11:29 AM Ning Kang  wrote:
>
>> Ahmet, thanks for forwarding!
>>
>>
>>> My main concern at this point is the introduction of new concepts, even
>>> though these are not changing other parts of the Beam SDKs. It would be
>>> good to see at least an alternative option covered in the design document.
>>> The reason is each additional concept adds to the mental load of users. And
>>> also concepts from interactive Beam will shift user's expectations of Beam
>>> even though there are not direct SDK modifications.
>>
>>
>> Hi Robert. About the concern, I think I have a few points:
>>
>>1. *Interactive Beam (or Interactive Runner) is already an existing
>>"new concept" that normal Beam user could opt-in if they want an
>>interactive Beam experience.* They need to do lots of setup steps and
>>learn new things such as Jupyter notebook and at least interactive_runner
>>module to make it work and make use of it.
>>
>> I think we should start with the perspective that most users interested
> in using Beam interactively already know about Jupyter notebooks, or at
> least ipython, and would want to use it to learn (and more effectively use)
> Beam.
>
> Yes, I agree with the perspective for users who are familiar with
> notebook. Yet it doesn’t prevent us from creating ready-to-use containers
> (such as binder <https://github.com/jupyterhub/binderhub>)  for users who
> want to try Beam interactively without setting up a environment with all
> the dependencies interactive Beam introduces. I agree that experienced
> users understand how to set up additional dependencies and read examples,
> it’s just we are also targeting other entry level audiences.
> But back to the original topic, the design is not trying to add new
> concept, but fixing some rough edges of existing Interactive Beam features.
> We can discuss whether a factory of create_pipeline() is really desired and
> decide whether to expose it later. We hope the interactive_beam module to
> be the only module an Interactive Beam user would directly invoke in their
> notebook.
>

My goal would be that one uses a special interactive module for those
concepts that are unique to being interactive, and standard Beam concepts
(rather than replacements or wrappers) otherwise.

>
>>1. *The behavior of existing interactive Beam is different from
>>normal Beam because of the interactive nature and the users would expect
>>that.* And the users wouldn't shift their expectation of normal Beam.
>>Just like running Python scripts might result in different behavior than
>>running all of them in an interactive Python session.
>>
>> I'm not quite following this. One of the advantages strengths of Python
> is that lack of the difference between the interactive vs. non-interactive
> behavior. (The fact that a script's execution is always in top to bottom
> order, unlike a notebook, is the primary difference.)
>
> Sorry for the confusion. What I’m saying is about the hidden states.
> Running several Python scripts from top to bottom in an IPython session
> might generate different effects than running them in the same order
> normally. Say if you have an in-memory global configuration that is shared
> among all the scripts and if it’s missing, a script initializes one.
> Running the scripts in IPython will pass the initialization and
> modification of configuration along the scripts. While running the scripts
> one by one will initialize different configurations. Running cells in a
> notebook is equivalent to appending the cells into a script and run it. The
> interactivity is not about the order, but if there is hidden states
> preserved between each statement or each script execution. And the users
> should expect that there might be hidden states when they are in an
> interactive environment because that is exactly the interactivity they
> expect. However, they don’t hold the hidden states, the session does it for
> them. A user wouldn’t need to explicitly say “preserve the variable x I’ve
> defined in this cell because I want to reuse it in some other cells I’m
> going to execute”. The user can directly access variable x once the cell
> defining x is executed. And even if the user deletes the cell defining x, x
> still exists. At that stage, no one would know there is a variable x in
> memory by just looking at the notebook. One would see a missing execution
> sequence (on top left of each executed cell) and wonder where the piece of
> 

Re: Brief of interactive Beam

2019-08-21 Thread Robert Bradshaw
active apis/experiences for different runners. In particular, the many
instances to the DirectRunner are worrisome--what's special about the
DirectRunner that other runners cannot provide that's needed for
interactive? If we can't come up with a good answer to that, we should not
impose this restriction.

>
>1. *When users run pipeline built from interactive runner in a
>non-interactive environment, it's direct runner like any other Beam
>tutorial demonstrates*. It's even easier because the user doesn't need
>to specify the runner nor pass in options.
>
>  So is the idea to have code like

if is_ipython() or is_jupyter() or is ...:
  do_something()
else:
  do_another_thing()

I'd really like to avoid this as it means one will (quite surprisingly, and
possibly for subtle reasons) not copy code from a notebook elsewhere. Or
did you mean something else here?

>
>1. *Interactive Beam is solving an orthogonal set of problems than
>Beam*. You can think of it as a wrapper of Beam that enables
>interactivity and it's not even a real runner. It doesn't change the Beam
>model such as how you build a pipeline. And with the Beam portability, you
>get the capability to run the pipeline built from interactive runner with
>other runners for free. It adds the interactive behavior that a user
>expects.
>2. *We want to open source it though we can iterate faster without
>doing it*. The whole project can be encapsulated in a completely
>irrelevant repository and from a developer's perspective, I want to hide
>all the implementation details from the interactive Beam user. However, as
>there is more and more desire for interactive Beam (+Mehran Nazir
> for more details), we want to share the
>implementation with others who want to contribute and explore the
>interactive world.
>
> I would much rather see interactivity as part of the Beam project. With
good APIs the implementations don't have to be tightly coupled (e.g. the
underlying runner delegation) but I think it will be a better user
experience if interactive was a mode rather than a wrapper with different
entry points.


I think watch() is a really good solution to knowing which collections to
cache, and visualize() will be very useful.

One thing I don't see tackled at all yet is the fact that pipelines are
only ever mutated by appending on new operations, so some design needs to
be done in terms of how to remove (possibly error-causing) operations or
replace bad ones with fixed ones. This is where most of the unsolved
problems lie.

Also +David Yan   for more opinions.
>
> Thanks!
>
> Ning.
>
> On Tue, Aug 13, 2019 at 6:00 PM Ahmet Altay  wrote:
>
>> Ning, I believe Robert's questions from his email has not been answered
>> yet.
>>
>> On Tue, Aug 13, 2019 at 5:00 PM Ning Kang  wrote:
>>
>>> Hi all, I'll leave another 3 days for design
>>> <https://docs.google.com/document/d/1DYWrT6GL_qDCXhRMoxpjinlVAfHeVilK5Mtf8gO6zxQ/edit?usp=sharing>
>>>  review.
>>> Then we can have a vote session if there is no objection.
>>>
>>> Thanks!
>>>
>>> On Fri, Aug 9, 2019 at 12:14 PM Ning Kang  wrote:
>>>
>>>> Thanks Ahmet for the introduction!
>>>>
>>>> I've composed a design overview
>>>> <https://docs.google.com/document/d/1DYWrT6GL_qDCXhRMoxpjinlVAfHeVilK5Mtf8gO6zxQ/edit?usp=sharing>
>>>> describing changes we are making to components around interactive runner.
>>>> I'll share the document in our email thread too.
>>>>
>>>> The truth is since interactive runner is not yet a recognized runner as
>>>> part of the Beam SDK (and it's fundamentally a wrapper around direct
>>>> runner), we are not touching any Beam SDK components.
>>>> We'll not change any behavior of existing Beam SDK and we'll try our
>>>> best to keep it that way in the future.
>>>>
>>>
>> My main concern at this point is the introduction of new concepts, even
>> though these are not changing other parts of the Beam SDKs. It would be
>> good to see at least an alternative option covered in the design document.
>> The reason is each additional concept adds to the mental load of users. And
>> also concepts from interactive Beam will shift user's expectations of Beam
>> even though there are not direct SDK modifications.
>>
>>
>>>
>>>> In the meantime, I'll work on other components orthogonal to Beam such
>>>> as Pipeline Display and Data Visualization I mentioned in the design
>>>> overview.
>>>>
>>>> If you have any questions, please feel

Re: Try to understand "Output timestamps must be no earlier than the timestamp of the current input"

2019-08-20 Thread Robert Bradshaw
The original timestamps are probably being assigned in the
watchForNewFiles transform, which is also setting the watermark:

https://github.com/apache/beam/blob/release-2.15.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L668

Until https://issues.apache.org/jira/browse/BEAM-644 is resolved, it
probably makes sense to be able to customize the lag here.

On Fri, Aug 16, 2019 at 6:44 PM Chengzhi Zhao  wrote:
>
> Hi Theodore,
>
> Thanks again for your insight and help. I'd like to learn more about how we 
> got the timestamp from WindowedValue initially from +dev@beam.apache.org
>
> -Chengzhi
>
> On Fri, Aug 16, 2019 at 7:41 PM Theodore Siu  wrote:
>>
>> Hi Chengzhi,
>>
>> I'm not completely sure where/how the timestamp is set for a ProcessContext 
>> object. Here is the error code found within the Apache Beam repo.
>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
>> which makes reference to `elem.getTimestamp()` where elem is a WindowedValue.
>>
>> I am thinking +dev@beam.apache.org can offer some insight. Would be 
>> interested to find out more myself.
>>
>> -Theo
>>
>> On Fri, Aug 16, 2019 at 3:04 PM Chengzhi Zhao  
>> wrote:
>>>
>>> Hi Theodore,
>>>
>>> Thanks for your reply. This is just a simple example that I tried to 
>>> understand how event time works in Beam. I could have more fields and I 
>>> would have an event time for each of record, so I tried to let Beam know 
>>> which filed is the event time to use for later windowing and computation.
>>>
>>> I think we you mentioned the probable reason sounds reasonable, I am still 
>>> trying to figure out in the error message "current input 
>>> (2019-08-16T12:39:06.887Z)" is coming from if you have any insight on it.
>>>
>>> Thanks a lot for your help.
>>>
>>> -- Chengzhi
>>>
>>> On Fri, Aug 16, 2019 at 9:57 AM Theodore Siu  wrote:

 Hi Chengzhi,

 Are you simply trying to emit the timestamp onward? Why not just use 
 `out.output` with an PCollection?

 static class ReadWithEventTime extends DoFn {
 @DoFn.ProcessElement
 public void processElement(@Element String line, 
 OutputReceiver out){
 out.output(new Instant(Long.parseLong(line)));
 }
 }

 You can also output the line itself as a PCollection. If you line 
 has additional information to parse, consider a KeyValue Pair 
 https://beam.apache.org/releases/javadoc/2.2.0/index.html?org/apache/beam/sdk/values/KV.html
  where you can emit both some parsed context of the string and the 
 timestamp.

 The probable reason why outputWithTimestamp doesn't work with older times 
 is that the timestamp emitted is used specifically for windowing and for 
 streaming type Data pipelines to determine which window each record 
 belongs for aggregations.

 -Theo


 On Fri, Aug 16, 2019 at 8:52 AM Chengzhi Zhao  
 wrote:
>
> Hi folks,
>
> I am new to Beam and try to play with some example, I am running Beam 
> 2.14 with Direct runner to read some files (I continue generated).
>
> I am facing this error: Cannot output with timestamp 
> 2019-08-16T12:30:15.120Z. Output timestamps must be no earlier than the 
> timestamp of the current input (2019-08-16T12:39:06.887Z) minus the 
> allowed skew (0 milliseconds). I searched online but still don't quite 
> understand it so I am asking here for some help.
>
> A file has some past timestamp in it:
> 1565958615120
> 1565958615120
> 1565958615121
>
> My code looks something like this:
>
>static class ReadWithEventTime extends DoFn {
> @ProcessElement
> public void processElement(@Element String line, 
> OutputReceiver out){
> out.outputWithTimestamp(line, new Instant(Long.parseLong(line)));
> }
> }
>
> public static void main(String[] args) {
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline pipeline = Pipeline.create(options);
>
> String sourcePath = new File("files/").getPath();
>
> PCollection data = pipeline.apply("ReadData",
> TextIO.read().from(sourcePath + "/test*")
> .watchForNewFiles(Duration.standardSeconds(5), 
> Watch.Growth.never()));
>
> data.apply("ReadWithEventTime", ParDo.of(new 
> ReadWithEventTime()));
>
> pipeline.run().waitUntilFinish();
>
> }
>
>
> I am trying to understand in the error message where "current input 
> (2019-08-16T12:39:06.887Z)" is comming from. Is it the lowest watermark 
> when I start my application? If that's the case, is there a way that I 
> can change the initial watermark?
>
> Also, I can setup `withAllowedTimestampSkew` but it looks like it has 

Re: [PROPOSAL] An initial Schema API in Python

2019-08-20 Thread Robert Bradshaw
On Mon, Aug 19, 2019 at 5:44 PM Ahmet Altay  wrote:
>
>
>
> On Mon, Aug 19, 2019 at 9:56 AM Brian Hulette  wrote:
>>
>>
>>
>> On Fri, Aug 16, 2019 at 5:17 PM Chad Dombrova  wrote:

 >> Agreed on float since it seems to trivially map to a double, but I’m 
 >> torn on int still. While I do want int type hints to work, it doesn’t 
 >> seem appropriate to map it to AtomicType.INT64, since it has a 
 >> completely different range of values.
 >>
 >> Let’s say we used native int for the runtime field type, not just as a 
 >> schema declaration for numpy.int64. What is the real world fallout from 
 >> this? Would there be data loss?
 >
 > I'm not sure I follow the question exactly, what is the interplay 
 > between int and numpy.int64 in this scenario? Are you saying that 
 > np.int64 is used in the schema declaration, but we just use native int 
 > at runtime, and check the bit width when encoding?
 >
 > In any case, I don't think the real world fallout of using int is nearly 
 > that dire. I suppose data loss is possible if a poorly designed pipeline 
 > overflows an int64 and crashes,

 The primary risk is that it *won't* crash when overflowing an int64,
 it'll just silently give the wrong answer. That's much less safe than
 using a native int and then actually crashing in the case it's too
 large at the point one tries to encode it.
>>>
>>>
>>> If the behavior of numpy.int64 is less safe than int, and both support 
>>> 64-bit integers, and int is the more intuitive type to use, then that seems 
>>> to make a strong case for using int rather than numpy.int64.
>>>
>>
>> I'm not sure we established numpy.int64 is less safe, just that a silent 
>> overflow is a risk.

Silent overflows are inherently less safe, especially for a language
where users in general never have to deal with this.

>> By default numpy will just log a warning when an overflow occurs, so it's 
>> not totally silent, but definitely risky. numpy can however be made to throw 
>> an exception when an overflow occurs with `np.seterr(over='raise')`.

Warning logs on remote machines are unlikely to ever be seen. Even if
one knew about the numpy setting (keep in mind the user may not ever
directly user or import numpy), it doesn't seem to work (and one would
have to set it on the remote workers, or propagate this setting if set
in the main program).

In [1]: import numpy as np
In [2]: np.seterr(over='raise')  # returns previous value
Out[2]: {'divide': 'warn', 'invalid': 'warn', 'over': 'warn', 'under': 'ignore'}
In [3]: np.int64(2**36) * np.int64(2**36)
Out[3]: 0

>> Regardless of what type is used in the typing representation of a schema, 
>> we've established that RowCoder.encode should accept anything convertible to 
>> an int for integer fields. So it will need to check it's width and raise an 
>> error if it's too large.
>> I added some tests last week to ensure that RowCoder does this [1]. However 
>> they're currently skipped because I'm unsure of the proper place to raise 
>> the error. I wrote up the details in a comment [2] (sorry I did a force push 
>> so the comment doesn't show up in the appropriate place).
>>
>> Note that when decoding an INT32/64 field RowCoder still produces plain old 
>> ints (since it relies on VarIntCoder), so int really is the runtime type, 
>> and the numpy types are just for the typing representation of a schema.
>>
>> I also updated my PR to accept int, float, and str in the typing 
>> representation of a schema, and added the following summary of type mappings 
>> to typehints.schema [1], since it's not readily apparent from the code 
>> itself:
>
>
> Cool!
>
>>
>>
>> Python  Schema
>> np.int8 <-> BYTE
>> np.int16<-> INT16
>> np.int32<-> INT32
>> np.int64<-> INT64
>> int ---/
>> np.float32  <-> FLOAT
>> np.float64  <-> DOUBLE
>> float   ---/
>> bool<-> BOOLEAN
>> The mappings for STRING and BYTES are different between python 2 and python 
>> 3,
>> because of the changes to str:
>> py3:
>> str/unicode <-> STRING
>> bytes   <-> BYTES
>> ByteString  ---/
>> py2:
>> unicode <-> STRING
>> str/bytes   ---/
>> ByteString  <-> BYTES
>>
>> As you can see, int and float typings can now be used to create a schema 
>> with an INT64 or DOUBLE attribute, but when creating an anonymous NamedTuple 
>> sub-class from a schema, the numpy types are preferred. I prefer that 
>> approach, if only for symmetry with the other integer and floating point 
>> types, but I can change it to prefer int/float if I'm the only one that 
>> feels that way.

Just to be clear, this is just talking about the schema itself (as at
that level, due to the many-to-one mapping above, no distinction is
made between int vs. int64). The runtime types are still int/float,
right?

> Just an opinion: As a user I would expect anonymous types created for me to 
> have 

Re: Java 11 compatibility question

2019-08-09 Thread Robert Bradshaw
On Fri, Aug 9, 2019 at 12:48 PM Michał Walenia 
wrote:

> From what I understand, the Java 8 -> 11 testing isn't in essence similar
> to py2 -> py3 checks.
>

True. Python 3 is in many ways a new language, and much less (and more
subtly) backwards compatible. You also can't "link" Python 3 code against
Python 2 code the way you can use old Java classes in new JVMs.


> In the case of Java, all we want to do is check if Beam downloaded by
> users from Maven (and compiled with JDK8) won't act up if used from a
> JDK/JRE 11 environment. We don't want to migrate the tool itself to a newer
> language version. As I mentioned in my previous email, there already are
> test suites checking compatibility - ValidatesRunner on Direct and Dataflow
> runners running in normal and portable mode.
> Those tests keep passing, so I believe we're mostly fine regarding
> compatibility.
> All I want to know is - is this enough?
> How else can we test Beam to be sure it works in JRE 11? After several
> accidental launches of build tasks in JDK 11, I am sure that it's not
> buildable with it, but this is not the compatibility type we want to check.
>

Well, we will want this eventually. Before that, we'll want to be sure
users can build their Java 11 code against our artifacts.


>
> Thank you for your replies,
> Michal
>
>
> On Thu, Aug 8, 2019 at 10:25 PM Valentyn Tymofieiev 
> wrote:
>
>> From Python 3 migration standpoint, some high level pillars that increase
>> our confidence are:
>> - Test coverage: (PreCommit, PostCommit), creating a system to make it
>> easy for add test coverage in new language for new functionality.
>> - Support of new language version by core runners + ValidatesRunner test
>> coverage.
>> - Test of time: offer new functionality in a few releases, monitor &
>> address user feedback.
>>
>> Dependency audit and critical feature support in new language, as
>> mentioned by others, are important  points. If you are curious about
>> detailed AIs that went into Python 3 support, feel free to look into
>> BEAM-1251 or Py3 Kanban Board (
>> https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=245=detail
>> ).
>>
>> Thanks,
>> Valentyn
>>
>>
>> On Thu, Aug 8, 2019 at 7:24 PM Mark Liu  wrote:
>>
>>> Some actions we did for py2 to py3 works:
>>> - Check and resolve incompatible dependencies.
>>> - Enable py3 lint.
>>> - Fill feature gaps between py2 and py3 (e.g. new py3 container, new
>>> solution for type hint)
>>> - Add unit tests, integration tests and other tests on py3 for coverage.
>>> - Release (p3) and deprecation (p2) plan.
>>>
>>> Hope this helps on Java upgrade.
>>>
>>> Mark
>>>
>>> On Wed, Aug 7, 2019 at 3:19 PM Ahmet Altay  wrote:
>>>


 On Wed, Aug 7, 2019 at 12:21 PM Elliotte Rusty Harold <
 elh...@ibiblio.org> wrote:

> gRPC bug here: https://github.com/grpc/grpc-java/issues/3522
>
> google-cloud-java bug:
> https://github.com/googleapis/google-cloud-java/issues/5760
>
> Neither has a cheap or easy fix, I'm afraid. Commenting on these
> issues might help us prove that there's a demand to priorotize these
> compared to other work. If anyone has a support contract and could
> file a ticket asking for a fix, that would help even more.
>
> Those are the two I know about. There might be others elsewhere in the
> dependency tree.
>
>
> On Wed, Aug 7, 2019 at 2:25 PM Lukasz Cwik  wrote:
> >
> > Since java8 -> java11 is similar to python2 -> python3 migration,
> what was the acceptance criteria there?
>

 I do not remember formally discussing this. The bar used was, all
 existing tests will pass for python2 and python3. New tests will be added
 for python3 specific features. (To avoid any confusion this bar has not
 been cleared yet.)

 cc: +Valentyn Tymofieiev  could add more details.


> >
> > On Wed, Aug 7, 2019 at 1:54 PM Elliotte Rusty Harold <
> elh...@ibiblio.org> wrote:
> >>
> >>
> >>
> >> On Wed, Aug 7, 2019 at 9:41 AM Michał Walenia <
> michal.wale...@polidea.com> wrote:
> >>>
> >>>
> >>> Are these tests sufficient to say that we’re java 11 compatible?
> What other aspects do we need to test to be able to say that?
> >>>
> >>>
> >>
> >> Are any packages split across multiple jar files, including
> packages beam dependns on? That's the one that's bitten some other
> projects, including google-cloud-java and gRPC. If so, beam is not going 
> to
> work with the module system.
> >>
> >> Work is ongoing to fix splitn packages in both gRPC and
> google-cloud-java, but we're not very far down that path and I think it's
> going to be an API breaking change.
> >>
> > Romain pointed this out earlier and I fixed the last case of
> packages being split across multiple jars within Apache Beam but as you
> point out our transitive dependencies are not ready.

Re: Write-through-cache in State logic

2019-08-09 Thread Robert Bradshaw
The question is whether the SDK needs to wait for the StateResponse to
come back before declaring the bundle done. The proposal was to not
send the cache token back as part of an append StateResponse [1], but
pre-provide it as part of the bundle request.

Thinking about this some more, if we assume the state response was
successfully applied, there's no reason for the SDK to block the
bundle until it has its hands on the cache token--we can update the
cache once the StateResponse comes back whether or not the bundle is
still active. On the other hand, the runner needs a way to assert it
has received and processed all StateRequests from the SDK associated
with a bundle before it can declare the bundle complete (regardless of
the cache tokens), so this might not be safe without some extra
coordination (e.g. the ProcessBundleResponse indicating the number of
state requests associated with a bundle).

[1] 
https://github.com/apache/beam/blob/release-2.14.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L627

On Thu, Aug 8, 2019 at 6:57 PM Lukasz Cwik  wrote:
>
> The purpose of the new state API call in BEAM-7000 is to tell the runner that 
> the SDK is now blocked waiting for the result of a specific state request and 
> it should be used for fetches (not updates) and is there to allow for SDKs to 
> differentiate readLater (I will need this data at some point in time in the 
> future) from read (I need this data now). This comes up commonly where the 
> user prefetches multiple state cells and then looks at their content allowing 
> the runner to batch up those calls on its end.
>
> The way it can be used for clear+append is that the runner can store requests 
> in memory up until some time/memory limit or until it gets its first 
> "blocked" call and then issue all the requests together.
>
>
> On Thu, Aug 8, 2019 at 9:42 AM Robert Bradshaw  wrote:
>>
>> On Tue, Aug 6, 2019 at 12:07 AM Thomas Weise  wrote:
>> >
>> > That would add a synchronization point that forces extra latency 
>> > especially in streaming mode.
>> >
>> > Wouldn't it be possible for the runner to assign the token when starting 
>> > the bundle and for the SDK to pass it along the state requests? That way, 
>> > there would be no need to batch and wait for a flush.
>>
>> I think it makes sense to let the runner pre-assign these state update
>> tokens rather than forcing a synchronization point.
>>
>> Here's some pointers for the Python implementation:
>>
>> Currently, when a DoFn needs UserState, a StateContext object is used
>> that converts from a StateSpec to the actual value. When running
>> portably, this is FnApiUserStateContext [1]. The state handles
>> themselves are cached at [2] but this context only lives for the
>> lifetime of a single bundle. Logic could be added here to use the
>> token to share these across bundles.
>>
>> Each of these handles in turn invokes state_handler.get* methods when
>> its read is called. (Here state_handler is a thin wrapper around the
>> service itself) and constructs the appropriate result from the
>> StateResponse. We would need to implement caching at this level as
>> well, including the deserialization. This will probably require some
>> restructoring of how _StateBackedIterable is implemented (or,
>> possibly, making that class itself cache aware). Hopefully that's
>> enough to get started.
>>
>> [1] 
>> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L402
>> [2] 
>> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L436
>> .
>>
>> > On Mon, Aug 5, 2019 at 2:49 PM Lukasz Cwik  wrote:
>> >>
>> >> I believe the intent is to add a new state API call telling the runner 
>> >> that it is blocked waiting for a response (BEAM-7000).
>> >>
>> >> This should allow the runner to wait till it sees one of these I'm 
>> >> blocked requests and then merge + batch any state calls it may have at 
>> >> that point in time allowing it to convert clear + appends into set calls 
>> >> and do any other optimizations as well. By default, the runner would have 
>> >> a time and space based limit on how many outstanding state calls there 
>> >> are before choosing to resolve them.
>> >>
>> >> On Mon, Aug 5, 2019 at 5:43 PM Lukasz Cwik  wrote:
>> >>>
>> >>> Now I see what you mean.
>> >>>
>> >>> On Mon, Aug 5, 2019 at 5:42 PM Thomas Weise  wrote:
>> >>>>
>> &g

Re: (mini-doc) Beam (Flink) portable job templates

2019-08-09 Thread Robert Bradshaw
The expansion service is a separate service. (The flink jar happens to
bring both up.) However, there is negotiation to receive/validate the
pipeline options.

On Fri, Aug 9, 2019 at 1:54 AM Thomas Weise  wrote:
>
> We would also need to consider cross-language pipelines that (currently) 
> assume the interaction with an expansion service at construction time.
>
> On Thu, Aug 8, 2019, 4:38 PM Kyle Weaver  wrote:
>>
>> > It might also be useful to have the option to just output the proto and 
>> > artifacts, as alternative to the jar file.
>>
>> Sure, that wouldn't be too big a change if we were to decide to go the SDK 
>> route.
>>
>> > For the Flink entry point we would need to allow for the job server to be 
>> > used as a library.
>>
>> We don't need the whole job server, we only need to add a main method to 
>> FlinkPipelineRunner [1] as the entry point, which would basically just do 
>> the setup described in the doc then call FlinkPipelineRunner::run.
>>
>> [1] 
>> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java#L53
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>>
>>
>> On Thu, Aug 8, 2019 at 4:21 PM Thomas Weise  wrote:
>>>
>>> Hi Kyle,
>>>
>>> It might also be useful to have the option to just output the proto and 
>>> artifacts, as alternative to the jar file.
>>>
>>> For the Flink entry point we would need to allow for the job server to be 
>>> used as a library. It would probably not be too hard to have the Flink job 
>>> constructed via the context execution environment, which would require no 
>>> changes on the Flink side.
>>>
>>> Thanks,
>>> Thomas
>>>
>>>
>>> On Thu, Aug 8, 2019 at 9:52 AM Kyle Weaver  wrote:
>>>>
>>>> Re Javaless/serverless solution:
>>>> I take it this would probably mean that we would construct the jar 
>>>> directly from the SDK. There are advantages to this: full separation of 
>>>> Python and Java environments, no need for a job server, and likely a 
>>>> simpler implementation, since we'd no longer have to work within the 
>>>> constraints of the existing job server infrastructure. The only downside I 
>>>> can think of is the additional cost of implementing/maintaining jar 
>>>> creation code in each SDK, but that cost may be acceptable if it's simple 
>>>> enough.
>>>>
>>>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>>>>
>>>>
>>>> On Thu, Aug 8, 2019 at 9:31 AM Thomas Weise  wrote:
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Aug 8, 2019 at 8:29 AM Robert Bradshaw  
>>>>> wrote:
>>>>>>
>>>>>> > Before assembling the jar, the job server runs to create the 
>>>>>> > ingredients. That requires the (matching) Java environment on the 
>>>>>> > Python developers machine.
>>>>>>
>>>>>> We can run the job server and have it create the jar (and if we keep
>>>>>> the job server running we can use it to interact with the running
>>>>>> job). However, if the jar layout is simple enough, there's no need to
>>>>>> even build it from Java.
>>>>>>
>>>>>> Taken to the extreme, this is a one-shot, jar-based JobService API. We
>>>>>> choose a standard layout of where to put the pipeline description and
>>>>>> artifacts, and can "augment" an existing jar (that has a
>>>>>> runner-specific main class whose entry point knows how to read this
>>>>>> data to kick off a pipeline as if it were a users driver code) into
>>>>>> one that has a portable pipeline packaged into it for submission to a
>>>>>> cluster.
>>>>>
>>>>>
>>>>> It would be nice if the Python developer doesn't have to run anything 
>>>>> Java at all.
>>>>>
>>>>> As we just discussed offline, this could be accomplished by  including 
>>>>> the proto that is produced by the SDK into the pre-existing jar.
>>>>>
>>>>> And if the jar has an entry point that creates the Flink job in the 
>>>>> prescribed manner [1], it can be directly submitted to the Flink REST 
>>>>> API. That would allow for Java free client.
>>>>>
>>>>> [1] 
>>>>> https://lists.apache.org/thread.html/6db869c53816f4e2917949a7c6992c2b90856d7d639d7f2e1cd13768@%3Cdev.flink.apache.org%3E
>>>>>


Re: Beam Python Portable Runner - Adding timeout to JobServer grpc calls

2019-08-09 Thread Robert Bradshaw
If we do provide a configuration value for this, I would make it have a
fairly large default and ure-use the flag for all RPCs of similar nature,
not tweeks for this particular service only.

On Fri, Aug 9, 2019 at 2:58 AM Ahmet Altay  wrote:

> Default plus a flag to override sounds reasonable. Although from Dataflow
> experience I do not remember timeouts causing issues and each new added
> flag adds complexity. What do others think?
>
> On Thu, Aug 8, 2019 at 11:38 AM Kyle Weaver  wrote:
>
>> If we do make a default, I still think it should be configurable via a
>> flag. I can't think of why the prepare, stage artifact, job state, or job
>> message requests might take more than 60 seconds, but you never know,
>> particularly with artifact staging, which might be uploading artifacts to
>> distributed storage.
>>
>> I assume the run request itself would not be subject to timeouts, as
>> running the pipeline can be assumed to take significantly longer than the
>> setup work.
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>>
>>
>> On Thu, Aug 8, 2019 at 11:20 AM Enrico Canzonieri 
>> wrote:
>>
>>> Default timeout with no flag may work as well.
>>> The main consideration here is whether some api calls may take longer
>>> than 60 seconds because of the complexity of the users' Beam pipeline. E.g.
>>> Could job_service.Prepare() take longer than 60 seconds if the given Beam
>>> pipeline is extremely complex?
>>>
>>> Basically if there are cases when the user code may cause the call
>>> duration to increase to the point the timeout prevents submitting the app
>>> itself then we should consider having a flag.
>>>
>>> On 2019/08/07 20:13:12, Ahmet Altay wrote:
>>> > Could we pick a default timeout value instead of introducing a flag?
>>> We use>
>>> > 60 seconds as the default timeout for http client [1], we can do the
>>> same>
>>> > here.>
>>> >
>>> > [1]>
>>> >
>>> https://github.com/apache/beam/blob/3a182d64c86ad038692800f5c343659ab0b935b0/sdks/python/apache_beam/internal/http_client.py#L32>
>>>
>>> >
>>> > On Wed, Aug 7, 2019 at 11:53 AM enrico canzonieri >
>>> > wrote:>
>>> >
>>> > > Hello,>
>>> > >>
>>> > > I noticed that the calls to the JobServer from the Python SDK do not
>>> have>
>>> > > timeouts. If I'm not mistaken that means that the call to
>>> pipeline.run()>
>>> > > could hang forever if the JobServer is not running (or failing to
>>> start).>
>>> > > E.g.>
>>> > >
>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/portable_runner.py#L307>
>>>
>>> > > the call to Prepare() doesn't provide any timeout value and the
>>> same>
>>> > > applies to other JobServer requests.>
>>> > > I was considering adding a --job-server-request-timeout to the>
>>> > > PortableOptions>
>>> > > >
>>> > > class to be used in the JobServer interactions inside
>>> probable_runner.py.>
>>> > > Is there any specific reason why the timeout is not currently
>>> supported?>
>>> > > Does anybody have any objection adding the jobserver timeout? I
>>> could>
>>> > > volunteer to file a ticket and submit a pr for this.>
>>> > >>
>>> > > Cheers,>
>>> > > Enrico Canzonieri>
>>> > >>
>>> >
>>>
>>>


Re: Inconsistent Results with GroupIntoBatches PTransform

2019-08-09 Thread Robert Bradshaw
Could you clarify what you mean by "inconsistent" and "incorrect"? Are
elements missing/duplicated, or just batched differently?

On Fri, Aug 9, 2019 at 2:18 AM rahul patwari  wrote:
>
> I only ran in Direct runner. I will run in other runners and let you know the 
> results.
> I am not setting "streaming" when executing.
>
> On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik,  wrote:
>>
>> Have you tried running this on more than one runner (e.g. Dataflow, Flink, 
>> Direct)?
>>
>> Are you setting --streaming when executing?
>>
>> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari  
>> wrote:
>>>
>>> Hi,
>>>
>>> I am getting inconsistent results when using GroupIntoBatches PTransform.
>>> I am using Create.of() PTransform to create a PCollection from in-memory. 
>>> When a coder is given with Create.of() PTransform, I am facing the issue.
>>> If the coder is not provided, the results are consistent and correct(Maybe 
>>> this is just a coincidence and the problem is at some other place).
>>> If Batch Size is 1, results are always consistent.
>>>
>>> Not sure if this is an issue with Serialization/Deserialization (or) 
>>> GroupIntoBatches (or) Create.of() PTransform.
>>>
>>> The Java code, expected correct results, and inconsistent results are 
>>> available at https://github.com/rahul8383/beam-examples
>>>
>>> Thanks,
>>> Rahul


Re: (mini-doc) Beam (Flink) portable job templates

2019-08-08 Thread Robert Bradshaw
On Wed, Aug 7, 2019 at 5:59 PM Thomas Weise  wrote:
>
>> > * The pipeline construction code itself may need access to cluster 
>> > resources. In such cases the jar file cannot be created offline.
>>
>> Could you elaborate?
>
>
> The entry point is arbitrary code written by the user, not limited to Beam 
> pipeline construction alone. For example, there could be access to a file 
> system or other service to fetch metadata that is required to build the 
> pipeline. Such services can be accessed when the code runs within the 
> infrastructure, but typically not in a development environment.

Yes, this may be limited to the case that the pipeline construction
can be done on the users machine before submission (remotely staging
the executing the Python (or Go, or ...) code within the
infrastructure to build the pipeline and then running the job server
there is a bit more complicated). We control the entry point from then
on.

>> > * For k8s deployment, a container image with the SDK and application code 
>> > is required for the worker. The jar file (which is really a derived 
>> > artifact) would need to be built in addition to the container image.
>>
>> Yes. For standard use, a vanilla released Beam published SDK container
>> + staged artifacts should be sufficient.
>>
>> > * To build such jar file, the user would need a build environment with job 
>> > server and application code. Do we want to make that assumption?
>>
>> Actually, it's probably much easier than that. A jar file is just a
>> zip file with a standard structure, to which one can easily add (data)
>> files without having a full build environment. The (pre-compiled) main
>> class would know how to read this data to construct the pipeline and
>> kick off the job just like any other Flink job.
>
> Before assembling the jar, the job server runs to create the ingredients. 
> That requires the (matching) Java environment on the Python developers 
> machine.

We can run the job server and have it create the jar (and if we keep
the job server running we can use it to interact with the running
job). However, if the jar layout is simple enough, there's no need to
even build it from Java.

Taken to the extreme, this is a one-shot, jar-based JobService API. We
choose a standard layout of where to put the pipeline description and
artifacts, and can "augment" an existing jar (that has a
runner-specific main class whose entry point knows how to read this
data to kick off a pipeline as if it were a users driver code) into
one that has a portable pipeline packaged into it for submission to a
cluster.


Re: [PROPOSAL] An initial Schema API in Python

2019-08-08 Thread Robert Bradshaw
On Wed, Aug 7, 2019 at 11:12 PM Brian Hulette  wrote:
>
> Thanks for all the suggestions, I've added responses inline.
>
> On Wed, Aug 7, 2019 at 12:52 PM Chad Dombrova  wrote:
>>
>> There’s a lot of ground to cover here, so I’m going to pull from a few 
>> different responses.
>>
>> 
>>
>> numpy ints
>>
>> A properly written library should accept any type implementing the int (or 
>> index) methods in place of an int, rather than doing explicit type checks
>>
>> Yes, but the reality is that very very few actually do this, including Beam 
>> itself (check the code for Timestamp and Duration, to name a few).
>>
>> Which brings me to my next topic:
>>
>> I tested this out with mypy and it would not be compatible:
>>
>> def square(x: int):
>> return x*x
>>
>> square(np.int16(32)) # mypy error
>>
>> The proper way to check this scenario is using typing.SupportsInt. Note that 
>> this only guarantees that __int__ exists, so you still need to cast to int 
>> if you want to do anything with the object:
>>
>> def square(x: typing.SupportsInt) -> int:
>> if not isinstance(x, int):
>> x = int(x)
>> return x*x
>>
>> square('foo')  # error!
>> square(1.2)  # ok
>
>  Yep I came across this while writing my last reply. I agree though it seems 
> unlikely that many libraries actually do this.
>
>> 
>>
>> Native python ints
>>
>> Agreed on float since it seems to trivially map to a double, but I’m torn on 
>> int still. While I do want int type hints to work, it doesn’t seem 
>> appropriate to map it to AtomicType.INT64, since it has a completely 
>> different range of values.
>>
>> Let’s say we used native int for the runtime field type, not just as a 
>> schema declaration for numpy.int64. What is the real world fallout from 
>> this? Would there be data loss?
>
> I'm not sure I follow the question exactly, what is the interplay between int 
> and numpy.int64 in this scenario? Are you saying that np.int64 is used in the 
> schema declaration, but we just use native int at runtime, and check the bit 
> width when encoding?
>
> In any case, I don't think the real world fallout of using int is nearly that 
> dire. I suppose data loss is possible if a poorly designed pipeline overflows 
> an int64 and crashes,

The primary risk is that it *won't* crash when overflowing an int64,
it'll just silently give the wrong answer. That's much less safe than
using a native int and then actually crashing in the case it's too
large at the point one tries to encode it.

> but that's possible whether we use int or np.int64 at runtime. I'm just 
> saying that a user could be forgiven for thinking that they're safe from 
> overflows if they declare a schema as NamedTuple('Foo', 
> [('to_infinity_and_beyond', int)]), but they shouldn't make the same mistake 
> when they explicitly call it an int64.

Yes. But for schemas to be maximally useful, we'll want to be able to
infer them from all sorts of things that aren't written with Beam in
mind (e.g. external data classes, function annotations) and rejecting
the builtin int type will be a poor user experience here.

>> 
>>
>> Python3-only
>>
>> No need to worry about 2/3 compatibility for strings, we could just use str
>>
>> This is already widely handled throughout the Beam python SDK using the 
>> future/past library, so it seems silly to give up on this solution for 
>> schemas.
>>
>> On this topic, I added some comments to the PR about using 
>> past.builtins.unicode instead of numpy.unicode. They’re the same type, but 
>> there’s no reason to get this via numpy, when everywhere else in the code 
>> gets it from past.
>>
>> We could just use bytes for byte arrays (as a shorthand for 
>> typing.ByteString [1])
>>
>> Neat, but in my obviously very biased opinion it is not worth cutting off 
>> python2 users over this.
>
> Ok I won't do this :) I wasn't aware of typing.Sequence, that does seem like 
> a good fit. The other two items are just nice-to-haves, I'm happy to work 
> around those and use Sequence for arrays instead.

I would imagine that we could accept bytes or typing.ByteString for
BYTES, with only Python 2 users having to do the latter. (In both
Python 2 and Python 3 one would use str for STRING, it would decode to
past.builtins.unicode. This seems to capture the intent better than
mapping str to BYTES in Python 2 only.)


Re: Brief of interactive Beam

2019-08-08 Thread Robert Bradshaw
Thanks for the note. Are there any associated documents worth sharing as
well? More below.

On Wed, Aug 7, 2019 at 9:39 PM Ning Kang  wrote:

> To whom may concern,
>
> This is Ning from Google. We are currently making efforts to leverage an
> interactive runner under python beam sdk.
>
> There is already an interactive Beam (iBeam for short) runner with jupyter
> notebook in the repo
> 
> .
> Following the instructions on that page, one can set up an interactive
> environment to develop and execute Beam pipeline interactively.
>
> However, there are many issues with existing iBeam. One issue is that it
> uses a concept of leaf PCollection to cache and materialize intermediate
> PCollection. If the user wants to reuse/introspect a non-leaf PCollection,
> the interactive runner will run into errors.
>
> Our initial effort will be fixing the existing issues. And we also want to
> make iBeam easy to use. Since iBeam uses the same model Beam uses, there
> isn't really any difference for users between creating a pipeline with
> interactive runner and other runners.
> So we want to minimize the interfaces a user needs to learn while giving
> the user some capability to interact with the interactive environment.
>
> See this initial PR , the
> interactive_beam module will provide mainly 4 interfaces:
>
>- For advanced users who define pipeline outside __main__, let them
>tell current interactive environment where they define their pipeline:
>watch()
>   - This is very useful for tests where pipeline can be defined in
>   test methods.
>   - If the user simply creates pipeline in a Jupyter notebook or a
>   plain Python script, they don't have to know/use this feature at all.
>
>
This is for using visualize() below, or building further on the pipeline,
right?


>
>- Let users create an interactive pipeline: create_pipeline()
>   - invoking create_pipeline(), the user gets a Pipeline object that
>   works as any other Pipeline object created from apache_beam.Pipeline()
>   - However, the pipeline object p, when invoking p.run(), does some
>   extra interactive magic.
>   - We'll support interactive execution for DirectRunner at this
>   moment.
>
> How is this different than creating a pipeline with the interactive
runner? It'd be nice to reduce the number of new concepts a user needs to
know (and also reduce the number of changes needed to move from interactive
to non-interactive). Is there any need to limit this to the Direct runner?

>
>- Let users run the interactive pipeline as a normal pipeline:
>run_pipeline()
>   - In an interactive environment, a user only needs to add and
>   execute 1 line of code run_pipeline(pipeline) to execute any existing
>   interactive pipeline object as normal pipeline in any selected platform.
>   - We'll probably support Dataflow only. Other implementations can
>   be added though.
>
> Again, how is this different than pipeline.run()? What features require
limiting this to only certain runners?

>
>- Let users introspect any intermediate PCollection they have handler
>to: visualize()
>   - If a user ever writes pcoll = p | "Some Transform" >>
>   some_transform() ..., they can visualize(pcoll) once the pipeline p is
>   executed.
>   - p can be batch or streaming
>   - The visualization will be some plot graph of data for the given
>   PCollection as if it's materialized. If the PCollection is unbounded, 
> the
>   graph is dynamic.
>
> The PR will implement 1 and 2.
>
> We'll use https://issues.apache.org/jira/browse/BEAM-7923 as the top
> level JIRA and add blocking JIRAs as development goes.
>
> External Beam users will not worry about any of the underlying
> implementation details.
> Except the 4 interfaces above, they learn and write normal Beam code and
> can execute the pipeline immediately when they are done with prototyping.
>
> Ning.
>


Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-08-07 Thread Robert Bradshaw
 timestamp, window and pane 
>>> > properties in Flink. But currently FullWindowedValueCoder is used by 
>>> > default in WireCoders.addWireCoder, I suggest to make the coder 
>>> > configurable (i.e. allowing to use ValueOnlyWindowedValueCoder)
>>> >
>>> > 5) Currently if a coder is not defined in StandardCoders, it will be 
>>> > wrapped with LengthPrefixedCoder (WireCoders.addWireCoder -> 
>>> > LengthPrefixUnknownCoders.addLengthPrefixedCoder). However, only a few 
>>> > coders are defined in StandardCoders. It means that for most coder, a 
>>> > length will be added to the serialized bytes which is not necessary in my 
>>> > thoughts. My suggestion is maybe we can add some interfaces or tags for 
>>> > the coder which indicate whether the coder is needed a length prefix or 
>>> > not.
>>> >
>>> > 6) Set log level according to PipelineOption in Python SDK Harness. 
>>> > Currently the log level is set to INFO by default.
>>> >
>>> > 7) Allows to start up StatusServer according to PipelineOption in Python 
>>> > SDK Harness. Currently the StatusServer is start up by default.
>>> >
>>> > Although I put 3) 4) 5) into the "Nice to Have" as they are performance 
>>> > related, I still think they are very critical for Python UDF execution 
>>> > performance.
>>> >
>>> > Open questions:
>>> > -
>>> > 1) Which coders should be / can be defined in StandardCoders?
>>> >
>>> > Currently we are preparing the design of how to support Python UDF in 
>>> > Flink based on the Beam portability framework and we will bring up the 
>>> > discussion in Flink community. We may propose more changes for Beam 
>>> > during that time and may need more support from Beam community.
>>> >
>>> > To be honest, I'm not an expert of Beam and so please feel free to 
>>> > correct me if my understanding is wrong. Welcome any feedback.
>>> >
>>> > Best,
>>> > Jincheng
>>>
>>>
>>>
>>>
>>>
>>>
>>> Cheers,
>>> Max
>>>
>>> On 31.07.19 12:16, Robert Bradshaw wrote:
>>> > Yep, Python support under active development,
>>> > e.g. https://github.com/apache/beam/pull/9188
>>> >
>>> > On Wed, Jul 31, 2019 at 9:24 AM jincheng sun >> > <mailto:sunjincheng...@gmail.com>> wrote:
>>> >
>>> > Thanks a lot for sharing the link. I take a quick look at the design
>>> > and the implementation in Java and think it could address my
>>> > concern. It seems that it's still not supported in the Python SDK
>>> > Harness. Is there any plan on that?
>>> >
>>> > Robert Bradshaw mailto:rober...@google.com>>
>>> > 于2019年7月30日周二 下午12:33写道:
>>> >
>>> > On Tue, Jul 30, 2019 at 11:52 AM jincheng sun
>>> > mailto:sunjincheng...@gmail.com>> 
>>> > wrote:
>>> >
>>> >
>>> > Is it possible to add an interface such as
>>> > `isSelfContained()` to the `Coder`? This interface
>>> > indicates
>>> > whether the serialized bytes are self contained. If
>>> > it returns true, then there is no need to add a
>>> > prefixing length.
>>> > In this way, there is no need to introduce an extra
>>> > protocol,  Please correct me if I missed something :)
>>> >
>>> >
>>> > The question is how it is self contained. E.g.
>>> > DoubleCoder is self contained because it always uses
>>> > exactly 8 bytes, but one needs to know the double coder
>>> > to leverage this. VarInt coder is self-contained a
>>> > different way, as is StringCoder (which does just do
>>> > prefixing).
>>> >
>>> >
>>> > Yes, you are right! I think it again that we can not add
>>> > such interface for the coder, due to runner can not call it.
>>> > And just one more thought: does it make sense to add a
>>> > method such as "registerSelfContained Coder(xxx)" or so to
>>> > let users register the coders which can be processed in the
>>> > SDK Harness?  It's the responsibility of the SDK harness to
>>> > ensure that the coder is supported.
>>> >
>>> >
>>> > Basically, a "please don't add length prefixing to this coder,
>>> > assume everyone else can understand it (and errors will ensue if
>>> > anyone doesn't)" at the user level? Seems a bit dangerous. Also,
>>> > there is not "the SDK"--there may be multiple other SDKs in
>>> > general, and of course runner components, some of which may
>>> > understand the coder in question and some of which may not.
>>> >
>>> > I would say that if this becomes a problem, we could look at the
>>> > pros and cons of various remedies, this being one alternative.
>>> >
>>> >
>>> >
>>> >
>>> > I am hopeful that schemas give us a rich enough way to
>>> > encode the vast majority of types that we will want to
>>> > transmit across language barriers (possibly with some
>>> > widening promotions). For high performance one will want
>>> > to use formats like arrow rather than one-off coders as
>>> > well, which also biases us towards the schema work. The
>>> > set of StandardCoders is not closed, and nor is the
>>> > possibility of figuring out a way to communicate outside
>>> > this set for a particular pair of languages, but I think
>>> > it makes sense to avoid going that direction unless we
>>> > have to due to the increased API surface aread and
>>> > complexity it imposes on all runners and SDKs.
>>> >
>>> >
>>> > Great! Could you share some links about the schema work. It
>>> > seems very interesting and promising.
>>> >
>>> >
>>> > https://beam.apache.org/contribute/design-documents/#sql--schema 
>>> > and
>>> > of particular relevance https://s.apache.org/beam-schemas
>>> >
>>> >
>>> >


Re: Collecting metrics in JobInvocation - BEAM-4775

2019-08-07 Thread Robert Bradshaw
I think the question here is whether PipelineRunner::run is allowed to
be blocking. If it is, then the futures make sense (but there's no way
to properly cancel it). I'm OK with not being able to return metrics
on cancel in this case, or the case the pipeline didn't even start up
yet. Otherwise, we should quickly get a handle to the PipelineResult
and be able to query that for all future use.

On Fri, Jul 26, 2019 at 6:04 PM Kenneth Knowles  wrote:
>
> Took a look at the code, too. It seems like a mismatch in a few ways
>
>  - PipelineRunner::run is async already and returns while the job is still 
> running
>  - PipelineResult is a legacy name - it is really meant to be a handle to a 
> running job
>  - cancel() on a future is just not really related to cancel() in a job. I 
> would expect to cancel a job with PipelineResult::cancel and I would expect 
> JobInvocation::cancel to cancel the "start job" RPC/request/whatever. So I 
> would not expect metrics for a job which I decided to not even start.
>
> Kenn
>
> On Fri, Jul 26, 2019 at 8:48 AM Łukasz Gajowy  wrote:
>>
>> Hi all,
>>
>> I'm currently working on BEAM-4775. The goal here is to pass portable 
>> MetricResults over the RPC API to the PortableRunner (SDK) part and allow 
>> reading them there. The metrics can be collected from the pipeline result 
>> that is available in JobInvocation's callbacks. The callbacks are registered 
>> in start() and cancel() methods of JobInvocation. This is the place where my 
>> problems begin:
>>
>> I want to access the pipeline result and get the MetricResults from it. This 
>> is possible only in onSuccess(PipelineResult result) method of the callbacks 
>> registered in start() and cancel() in JobInvocation. Now, when I cancel the 
>> job invocation, invocationFuture.cancel() is called and will result in 
>> invoking onFailure(Throwable throwable) in case the pipeline is still 
>> running. onFailure() has no PipelineResult parameter, hence there currently 
>> is no possibility to collect the metrics there.
>>
>> My questions currently are:
>>
>> Should we collect metrics after the job is canceled? So far I assumed that 
>> we should.
>> If so, does anyone have some other ideas on how to collect metrics so that 
>> we could collect them when canceling the job?
>>
>> PR I'm working on with more discussions on the topic: PR 9020
>> The current idea on how the metrics could be collected in JobInvocation: link
>>
>> Thanks,
>> Łukasz
>>


Re: (mini-doc) Beam (Flink) portable job templates

2019-08-07 Thread Robert Bradshaw
On Wed, Aug 7, 2019 at 6:20 AM Thomas Weise  wrote:
>
> Hi Kyle,
>
> [document doesn't have comments enabled currently]
>
> As noted, worker deployment is an open question. I believe pipeline 
> submission and worker execution need to be considered together for a complete 
> deployment story. The idea of creating a self containing jar file is 
> interesting, but there are trade-offs:
>
> * The pipeline construction code itself may need access to cluster resources. 
> In such cases the jar file cannot be created offline.

Could you elaborate?

> * For k8s deployment, a container image with the SDK and application code is 
> required for the worker. The jar file (which is really a derived artifact) 
> would need to be built in addition to the container image.

Yes. For standard use, a vanilla released Beam published SDK container
+ staged artifacts should be sufficient.

> * To build such jar file, the user would need a build environment with job 
> server and application code. Do we want to make that assumption?

Actually, it's probably much easier than that. A jar file is just a
zip file with a standard structure, to which one can easily add (data)
files without having a full build environment. The (pre-compiled) main
class would know how to read this data to construct the pipeline and
kick off the job just like any other Flink job.

> The document that I had shared discusses options for pipeline submission. It 
> might be interesting to explore if your proposal for building such a jar can 
> be integrated or if you have other comments?
>
> Thomas
>
>
>
> On Tue, Aug 6, 2019 at 5:03 PM Kyle Weaver  wrote:
>>
>> Hi all,
>>
>> Following up on discussion about portable Beam on Flink on Kubernetes [1], I 
>> have drafted a short document on how I propose we bundle portable Beam 
>> applications into jars that can be run on OSS runners, similar to Dataflow 
>> templates (but without the actual template part, at least for the first 
>> iteration). It's pretty straightforward, but I thought I would broadcast it 
>> here in case anyone is interested.
>>
>> https://docs.google.com/document/d/1kj_9JWxGWOmSGeZ5hbLVDXSTv-zBrx4kQRqOq85RYD4/edit#
>>
>> [1] 
>> https://lists.apache.org/thread.html/a12dd939c4af254694481796bc08b05bb1321cfaadd1a79cd3866584@%3Cdev.beam.apache.org%3E
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com


Re: [PROPOSAL] An initial Schema API in Python

2019-08-06 Thread Robert Bradshaw
On Sun, Aug 4, 2019 at 12:03 AM Chad Dombrova  wrote:
>
> Hi,
>
> This looks like a great feature.
>
> Is there a plan to eventually support custom field types?
>
> I assume adding support for dataclasses in python 3.7+ should be trivial to 
> do in a follow up PR. Do you see any complications with that? The main 
> advantage that dataclasses have over NamedTuple in this context is argument 
> defaults, which is a nice convenience.

Java has a notion of logical types which has yet to be figured out in
a cross-langauge way but tackles this exact issue. I think there's a
lot of value in "anonymous" named tuples as intermediates well, e.g.
one might to a projection onto a subset of fields, and then do a
grouping/aggregating operation, in which case the new schema can be
inferred (even if it doesn't have a name).

> My PR as it is right now actually doesn’t even support int. I probably should 
> at least make a change to accept int as a type specification for iint64 but 
> throw an error when encoding if an int is too big.
>
> Should probably do the same for float.
>
> Another concern I have is, if there is a user function or a library that user 
> does not control, that uses typing to indicate that a function accepts a type 
> of int, would it be compatible with numpy types?
>
> I have similar concerns. I guess we’ll just have to cast to int before 
> passing into 3rd party code, which is not ideal. Why not use int for int64 in 
> python?

A properly written library should accept any type implementing the
__int__ (or __index__) methods in place of an int, rather than doing
explicit type checks, though performance may suffer. Likewise when
encoding, we should accept all sorts of ints when an int32 (say) is
expected, rather than force the user to know and cast to the right
type.

As for the mappings between Python types and schemas, there are
several mappings that are somewhat being conflated.

(1) There is the mapping used in definitions. At the moment, all
subclasses of NamedTuple map to the same generic Row schema type,
probably not something we want in the long run (but could be OK for
now if we think doing better in the future counts as backwards
compatible). For integral types, it makes sense to accept
np.int{8,16,32,64}, but should we accept the equivalent arrow types
here as well? I think we also need to accept the plain Python "int"
and "float" type, otherwise a standard Python class like

NamedTuple('Word', [('name', str), ('rank', int), ('frequency', float)]

will be surprisingly rejected.

(2) The mapping of concrete values to Python types. Rows mapping to
NamedTuples may give expectations beyond the attributes they offer
(and I'd imagine we'll want to be flexible with the possible
representations here, e.g. offering a slice of an arrow record batch).
Or do we need to pay the cost of re-creating the users NamedTuple
subclass. Ints are another interesting case--it may be quite
surprising to users for the returned values to have silent truncating
overflow semantics (very unlike Python) rather than the arbitrary
precision that Python's ints give (especially if the conversion from a
python int to an int64 happens due to an invisible fusion barrier).
Better to compute the larger value and then thrown an error if/when it
is encoded into a fixed width type later.

(3) The mapping of Python values into a row (e.g. for serialization).
If there are errors (e.g. a DoFn produces tuples of the wrong type),
how eagerly can we detect them? At what cost? How strict should we be
(e.g. if a named tuple with certain fields is expected, can we map a
concrete subclass to it? A NamedTuple that has a superset of the
fields? Implicitly mapping Python's float (aka a 64-bit C double) to a
float32 is a particularly sticky question.

I think we can make forward progress on implementation in parallel to
answering these questions, but we should be explicit and document what
the best options are here and then get the code to align.


Re: [ANNOUNCE] Beam 2.14.0 Released!

2019-08-02 Thread Robert Bradshaw
Lots of improvements all around. Thank you for pushing this through, Anton!

On Fri, Aug 2, 2019 at 1:37 AM Chad Dombrova  wrote:
>
> Nice work all round!  I love the release blog format with the highlights and 
> links to issues.
>
> -chad
>
>
> On Thu, Aug 1, 2019 at 4:23 PM Anton Kedin  wrote:
>>
>> The Apache Beam team is pleased to announce the release of version 2.14.0.
>>
>> Apache Beam is an open source unified programming model to define and
>> execute data processing pipelines, including ETL, batch and stream
>> (continuous) processing. See https://beam.apache.org
>>
>> You can download the release here:
>>
>> https://beam.apache.org/get-started/downloads/
>>
>> This release includes bugfixes, features, and improvements detailed on
>> the Beam blog: https://beam.apache.org/blog/2019/07/31/beam-2.14.0.html
>>
>> Thanks to everyone who contributed to this release, and we hope you enjoy
>> using Beam 2.14.0.
>>
>> -- Anton Kedin, on behalf of The Apache Beam team


Re: [ANNOUNCE] New committer: Jan Lukavský

2019-08-01 Thread Robert Bradshaw
Congratulations!

On Thu, Aug 1, 2019 at 9:59 AM Jan Lukavský  wrote:

> Thanks everyone!
>
> Looking forward to working with this great community! :-)
>
> Cheers,
>
>  Jan
> On 8/1/19 12:18 AM, Rui Wang wrote:
>
> Congratulations!
>
>
> -Rui
>
> On Wed, Jul 31, 2019 at 10:51 AM Robin Qiu  wrote:
>
>> Congrats!
>>
>> On Wed, Jul 31, 2019 at 10:31 AM Aizhamal Nurmamat kyzy <
>> aizha...@apache.org> wrote:
>>
>>> Congratulations, Jan! Thank you for your contributions!
>>>
>>> On Wed, Jul 31, 2019 at 10:04 AM Tanay Tummalapalli 
>>> wrote:
>>>
 Congratulations!

 On Wed, Jul 31, 2019 at 10:05 PM Ahmet Altay  wrote:

> Congratulations Jan! Thank you for your contributions!
>
> On Wed, Jul 31, 2019 at 2:30 AM Ankur Goenka 
> wrote:
>
>> Congratulations Jan!
>>
>> On Wed, Jul 31, 2019, 1:23 AM David Morávek  wrote:
>>
>>> Congratulations Jan, well deserved! ;)
>>>
>>> D.
>>>
>>> On Wed, Jul 31, 2019 at 10:17 AM Ryan Skraba 
>>> wrote:
>>>
 Congratulations Jan!

 On Wed, Jul 31, 2019 at 10:10 AM Ismaël Mejía 
 wrote:
 >
 > Hi,
 >
 > Please join me and the rest of the Beam PMC in welcoming a new
 > committer: Jan Lukavský.
 >
 > Jan has been contributing to Beam for a while, he was part of the
 team
 > that contributed the Euphoria DSL extension, and he has done
 > interesting improvements for the Spark and Direct runner. He has
 also
 > been active in the community discussions around the Beam model and
 > other subjects.
 >
 > In consideration of Jan's contributions, the Beam PMC trusts him
 with
 > the responsibilities of a Beam committer [1].
 >
 > Thank you, Jan, for your contributions and looking forward to
 many more!
 >
 > Ismaël, on behalf of the Apache Beam PMC
 >
 > [1] https://beam.apache.org/committer/committer

>>>


Re: [DISCUSS] Integer coders used in SchemaCoder

2019-07-31 Thread Robert Bradshaw
The standard VARINT coder is used for all sorts of integer values (e.g. the
output of the CountElements transform), but the vast majority of them are
likely significantly less than a full 64 bits. In Python, declaring an
element type to be int will use this. On the other hand, using a VarInt
format for int8 seems quite wasteful. Where the cutoff is is probably
arbitrary, but the java 32-bit int type is often used as the generic (and
often small-ish) integer type in Java, whereas int16 is an explicit choice
where one knows that 16 bits is good enough, but 8 isn't.

It looks like Go use the VarInt encoding everywhere:
https://github.com/apache/beam/blob/release-2.14.0/sdks/go/pkg/beam/coder.go#L135
. Python, as mentioned, uses VarInt encoding everywhere as well.

(There's also the question of whether we want to introduce StandardCoders
for all of these, or if we'd rather move to using Schemas over Coders and
just define them as part of the RowCoder.)




On Tue, Jul 30, 2019 at 8:30 PM Brian Hulette  wrote:

> Forgot to include a link to the code. The mapping from primitive type to
> coders can be found here:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java#L44
>
> On Tue, Jul 30, 2019 at 11:24 AM Brian Hulette 
> wrote:
>
>> Currently the coders used for integer types in RowCoder (and thus
>> SchemaCoder) are inconsistent. For int32 and int64, we use VarIntCoder and
>> VarLongCoder which encode those types with variable width, but for byte and
>> int16 we use ByteCoder and BigEndianShortCoder, which are fixed width.
>>
>> Is it a conscious choice to use variable width coders just for the larger
>> width integers (where they could have the most benefit), or should we
>> consider normalizing these coders to always be fixed width?
>>
>


Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-07-31 Thread Robert Bradshaw
Yep, Python support under active development, e.g.
https://github.com/apache/beam/pull/9188

On Wed, Jul 31, 2019 at 9:24 AM jincheng sun 
wrote:

> Thanks a lot for sharing the link. I take a quick look at the design and
> the implementation in Java and think it could address my concern. It seems
> that it's still not supported in the Python SDK Harness. Is there any plan
> on that?
>
> Robert Bradshaw  于2019年7月30日周二 下午12:33写道:
>
>> On Tue, Jul 30, 2019 at 11:52 AM jincheng sun 
>> wrote:
>>
>>>
>>>>> Is it possible to add an interface such as `isSelfContained()` to the
>>>>> `Coder`? This interface indicates
>>>>> whether the serialized bytes are self contained. If it returns true,
>>>>> then there is no need to add a prefixing length.
>>>>> In this way, there is no need to introduce an extra protocol,  Please
>>>>> correct me if I missed something :)
>>>>>
>>>>
>>>> The question is how it is self contained. E.g. DoubleCoder is self
>>>> contained because it always uses exactly 8 bytes, but one needs to know the
>>>> double coder to leverage this. VarInt coder is self-contained a different
>>>> way, as is StringCoder (which does just do prefixing).
>>>>
>>>
>>> Yes, you are right! I think it again that we can not add such interface
>>> for the coder, due to runner can not call it. And just one more thought:
>>> does it make sense to add a method such as "registerSelfContained
>>> Coder(xxx)" or so to let users register the coders which can be processed
>>> in the SDK Harness?  It's the responsibility of the SDK harness to ensure
>>> that the coder is supported.
>>>
>>
>> Basically, a "please don't add length prefixing to this coder, assume
>> everyone else can understand it (and errors will ensue if anyone doesn't)"
>> at the user level? Seems a bit dangerous. Also, there is not "the
>> SDK"--there may be multiple other SDKs in general, and of course runner
>> components, some of which may understand the coder in question and some of
>> which may not.
>>
>> I would say that if this becomes a problem, we could look at the pros and
>> cons of various remedies, this being one alternative.
>>
>>
>>>
>>>
>>>> I am hopeful that schemas give us a rich enough way to encode the vast
>>>> majority of types that we will want to transmit across language barriers
>>>> (possibly with some widening promotions). For high performance one will
>>>> want to use formats like arrow rather than one-off coders as well, which
>>>> also biases us towards the schema work. The set of StandardCoders is not
>>>> closed, and nor is the possibility of figuring out a way to communicate
>>>> outside this set for a particular pair of languages, but I think it makes
>>>> sense to avoid going that direction unless we have to due to the increased
>>>> API surface aread and complexity it imposes on all runners and SDKs.
>>>>
>>>
>>> Great! Could you share some links about the schema work. It seems very
>>> interesting and promising.
>>>
>>
>> https://beam.apache.org/contribute/design-documents/#sql--schema and of
>> particular relevance https://s.apache.org/beam-schemas
>>
>>
>>
>


Re: [VOTE] Release 2.14.0, release candidate #1

2019-07-31 Thread Robert Bradshaw
On Wed, Jul 31, 2019 at 11:22 AM Valentyn Tymofieiev 
wrote:

> I have checked Portable Wordcount example on Flink and Spark on Python 2
> and Python 3.
>
> To do so, I had to checkout Beam from git repo, since using the source
> distribution does not include gradlew, and gradelw_orig did not work for
> me. Commands I ran:
>
> git checkout tags/v2.14.0-RC1
> ./gradlew :sdks:python:container:py3:docker
> ./gradlew :runners:flink:1.5:job-server:runShadow# Use  ./gradlew
> :runners:spark:job-server:runShadow for Spark
> ./gradlew :sdks:python:test-suites:portable:py35:portableWordCountBatch
>  -PjobEndpoint=localhost:8099 -PenvironmentType=LOOPBACK
> cat /tmp/py-wordcount-direct* # to verify results.
>
> Loopback scenarios worked, however DOCKER scenarios did not. Opened
> several Jiras to follow up:
>
> https://issues.apache.org/jira/browse/BEAM-7857
> https://issues.apache.org/jira/browse/BEAM-7858
> https://issues.apache.org/jira/browse/BEAM-7859
> <https://issues.apache.org/jira/browse/BEAM-7859?filter=-2>
>

I commented on the bugs, and I think this is due to trying to use Docker
mode with local files (a known issue).


> The gradle targets that were required to run these tests are not present
> in 2.13.0 branch, so I don't consider it a regression and still cast +1.
>

Agreed.


> On Wed, Jul 31, 2019 at 11:31 AM Ismaël Mejía  wrote:
>
>> Oups Robert pointed to me that I have probably not counted correctly.
>> There were indeed already 3 PMC +1 votes. Pablo, Robert and Ahmet.
>> Please excuse me for the extra noise.
>>
>> On Wed, Jul 31, 2019 at 9:46 AM Ismaël Mejía  wrote:
>> >
>> > To complete the release we need to have at least three +1 binding
>> > votes (votes from PMC members) as stated in [1]. So far we have only
>> > 2.
>> >
>> > Thomas (and the others). The blog post PR is now open [2] please help
>> > us add missing features or maybe to highlight the ones you consider
>> > important in the PR comments.
>> >
>> > Here it is the missing +1 (binding). Validated SHAs+signatures,
>> > beam-samples and one internal company project with the new jars.
>> > Compared source file vs tagged git repo. Everything looks ok.
>> >
>> > [1] https://www.apache.org/foundation/voting.html#ReleaseVotes
>> > [2] https://github.com/apache/beam/pull/9201/files
>> >
>> > On Wed, Jul 31, 2019 at 6:27 AM Anton Kedin  wrote:
>> > >
>> > > Ran various postcommits, validates runners, and nexmark against the
>> release branch. All looks good so far.
>> > >
>> > > Will take another look at the docs/blog and the nexmark numbers
>> tomorrow, but if nothing comes up I will close the vote tomorrow
>> (Wednesday) by 6pm PST (= Thursday 01:00am UTC) since it's over 72hours
>> since the vote has started and we have a number of +1s including PMC
>> members and no -1s.
>> > >
>> > > Regards,
>> > > Anton
>> > >
>> > > On Tue, Jul 30, 2019 at 8:13 PM Valentyn Tymofieiev <
>> valen...@google.com> wrote:
>> > >>
>> > >> I also ran unit tests for Python 3.7 and they passed as well. Cython
>> tests for python3.7 require  `apt-get install python3.7-dev`.
>> > >>
>> > >> On Wed, Jul 31, 2019 at 3:16 AM Pablo Estrada 
>> wrote:
>> > >>>
>> > >>> +1
>> > >>>
>> > >>> I installed from source, and ran unit tests for Python in 2.7, 3.5,
>> 3.6.
>> > >>>
>> > >>> Also ran a number of integration tests on Py 3.5 on Dataflow and
>> DirectRunner.
>> > >>> Best
>> > >>> -P.
>> > >>>
>> > >>> On Tue, Jul 30, 2019 at 11:09 AM Hannah Jiang <
>> hannahji...@google.com> wrote:
>> > >>>>
>> > >>>> I checked Py3 tests using .zip, mainly with direct runners, and
>> everything looks good, so +1.
>> > >>>>
>> > >>>> On Tue, Jul 30, 2019 at 2:08 AM Robert Bradshaw <
>> rober...@google.com> wrote:
>> > >>>>>
>> > >>>>> I checked all the artifact signatures and ran a couple test
>> pipelines with the wheels (Py2 and Py3) and everything looked good to me,
>> so +1.
>> > >>>>>
>> > >>>>> On Mon, Jul 29, 2019 at 8:29 PM Valentyn Tymofieiev <
>> valen...@google.com> wrote:
>> > >>>>>>
>> > >>>>>> I have 

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-07-30 Thread Robert Bradshaw
On Tue, Jul 30, 2019 at 11:52 AM jincheng sun 
wrote:

>
>>> Is it possible to add an interface such as `isSelfContained()` to the
>>> `Coder`? This interface indicates
>>> whether the serialized bytes are self contained. If it returns true,
>>> then there is no need to add a prefixing length.
>>> In this way, there is no need to introduce an extra protocol,  Please
>>> correct me if I missed something :)
>>>
>>
>> The question is how it is self contained. E.g. DoubleCoder is self
>> contained because it always uses exactly 8 bytes, but one needs to know the
>> double coder to leverage this. VarInt coder is self-contained a different
>> way, as is StringCoder (which does just do prefixing).
>>
>
> Yes, you are right! I think it again that we can not add such interface
> for the coder, due to runner can not call it. And just one more thought:
> does it make sense to add a method such as "registerSelfContained
> Coder(xxx)" or so to let users register the coders which can be processed
> in the SDK Harness?  It's the responsibility of the SDK harness to ensure
> that the coder is supported.
>

Basically, a "please don't add length prefixing to this coder, assume
everyone else can understand it (and errors will ensue if anyone doesn't)"
at the user level? Seems a bit dangerous. Also, there is not "the
SDK"--there may be multiple other SDKs in general, and of course runner
components, some of which may understand the coder in question and some of
which may not.

I would say that if this becomes a problem, we could look at the pros and
cons of various remedies, this being one alternative.


>
>
>> I am hopeful that schemas give us a rich enough way to encode the vast
>> majority of types that we will want to transmit across language barriers
>> (possibly with some widening promotions). For high performance one will
>> want to use formats like arrow rather than one-off coders as well, which
>> also biases us towards the schema work. The set of StandardCoders is not
>> closed, and nor is the possibility of figuring out a way to communicate
>> outside this set for a particular pair of languages, but I think it makes
>> sense to avoid going that direction unless we have to due to the increased
>> API surface aread and complexity it imposes on all runners and SDKs.
>>
>
> Great! Could you share some links about the schema work. It seems very
> interesting and promising.
>

https://beam.apache.org/contribute/design-documents/#sql--schema and of
particular relevance https://s.apache.org/beam-schemas


Re: [VOTE] Release 2.14.0, release candidate #1

2019-07-30 Thread Robert Bradshaw
I checked all the artifact signatures and ran a couple test pipelines with
the wheels (Py2 and Py3) and everything looked good to me, so +1.

On Mon, Jul 29, 2019 at 8:29 PM Valentyn Tymofieiev 
wrote:

> I have checked Python 3 batch and streaming quickstarts on Dataflow runner
> using .zip and wheel distributions. So far +1 from me.
>
> On Mon, Jul 29, 2019 at 7:53 PM Ahmet Altay  wrote:
>
>> +1, validated python 2 quickstarts.
>>
>> On Fri, Jul 26, 2019 at 5:46 PM Ahmet Altay  wrote:
>>
>>> To confirm, I manuall validated leader board on python. It is working.
>>>
>>> On Fri, Jul 26, 2019 at 5:23 PM Yifan Zou  wrote:
>>>
 AFAIK, there should not be any special prerequisites for this. Things
 the script does including:
 1. download the python rc in zip
 2. start virtualenv and install the sdk.
 3. verify hash.
 4. config settings.xml and start a Java pubsub message injector.
 5. run game examples and validate.

 Could you double check if the sdk was installed properly (step 1&2)?

>>>
>>> I also guessing this is the case. Probably something earlier in the
>>> validation script did not run as expected.
>>>
>>>


>>> Yifan

 On Fri, Jul 26, 2019 at 2:38 PM Anton Kedin  wrote:

> Validation script fails for me when I try to run [1] python
> leaderboard with direct runner:
>
> ```
> *
> * Running Python Leaderboard with DirectRunner
> *
> /usr/bin/python: No module named apache_beam.examples.complete.game
> ```
>
> If someone has more context, what are the prerequisites for this step?
> How does it look up the module?
>
> [1]
> https://github.com/apache/beam/blob/master/release/src/main/scripts/run_rc_validation.sh#L424
>
> Regards,
> Anton
>
> On Fri, Jul 26, 2019 at 10:23 AM Anton Kedin  wrote:
>
>> Cool, will make the post and will update the release guide as well
>> then
>>
>> On Fri, Jul 26, 2019 at 10:20 AM Chad Dombrova 
>> wrote:
>>
>>> I think the release guide needs to be updated to remove the
 optionality of blog creation and avoid confusion. Thanks for pointing 
 that
 out.

>>>
>>> +1
>>>
>>>


Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-07-29 Thread Robert Bradshaw
On Mon, Jul 29, 2019 at 4:14 PM jincheng sun 
wrote:

> Hi Robert,
>
> Thanks for your detail comments, I would have added a few pointers inline.
>
> Best,
> Jincheng
>
> Robert Bradshaw  于2019年7月29日周一 下午12:35写道:
>
>> On Sun, Jul 28, 2019 at 6:51 AM jincheng sun 
>> wrote:
>> >
>> > Hi, Thomas & Robert, Thanks for your comments and providing relevant
>> discussions and JIRA links, very helpful to me!
>> >
>> > I am glad to see your affirmative response,  And I am glad to add my
>> thoughts on the comment you left:
>> > -
>> >
>> > >> There are two distinct levels at which one can talk about a certain
>> type of state being supported: the user-visible SDK's API and the runner's
>> API. For example, BagState, ValueState, ReducingState, CombiningState,  can
>> all be implemented on top of a runner-offered MapState in the SDK. On the
>> one hand, there's a desire to keep the number of "primitive" states types
>> to a minimum (to ease the use of authoring runners), but if a runner can
>> perform a specific optimization due to knowing about the particular state
>> type it might be preferable to pass it through rather than emulate it in
>> the SDK.
>> > ---
>> > Agree. Regarding MapState, it's definitely needed as it cannot be
>> implemented on top of the existing BagState.
>> > Regarding ValueState, it can be implemented on top of BagState.
>> However, we can do optimization if we know a state is ValueState.
>> > For example, if a key is updated with a new value, if the ValueState is
>> implemented on top of BagState, two RPC calls are needed
>> > to write the new value back to runner: clear + append; if we know it's
>> ValueState, just one RPC call is enough: set.
>> > We can discuss case by case whether a state type is needed.
>>
>> In the Beam APIs [1] multiple state requests are consumed as a stream
>> in a single RPC, so clear followed by append still has low overhead.
>> Is that optimization not sufficient?
>>
>> [1]
>> https://github.com/apache/beam/blob/release-2.14.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L573
>>
>>
> Actually there are two kinds of overhead:
> 1) the RPC overhead(I think in this point  may be sufficient for RPC)
> 2) the state read/write overhead, i.e., If there is no optimization, the
> runner needs to clear the state firstly and then set a new value for the
> state.
>

It's certainly an option to keep open. I'd avoid prematurely optimizing
until we have evidence that it matters.


>
> > ---
>> >
>> > >> Note that in the protos, the GRPC ports have a coder attribute
>> > specifically to allow this kind of customization (and the SDKs should
>> > be respecting that). We've also talked about going beyond per-element
>> > encodings (e.g. using arrow to serialize entire batches across the
>> > whire). I think all the runner code simply uses the default and we
>> > could be more intelligent there.
>> > ---
>> >
>> > Yes, the gRPC allows to use customization coder. However, I'm afraid
>> that this is not enough as we want to use
>> > Beam's portability framework by depending on the modules used
>> (beam-runners-java-fn-execution and the Python SDK Harness) instead
>> > of copying that part of code to Flink. So it should also allow to use
>> the customization coder in beam-runners-java-fn-execution.
>> > Otherwise, we have to copy a lot of code to Flink to use the
>> customization coder.
>>
>> Agreed, beam-runners-java-fn-execution does not take advantage of the
>> full flexibility of the protocol, and would make a lot of sense to
>> enhance it to be able to.
>>
>> > ---
>> >
>> > >> I'm wary of having too many buffer size configuration options (is
>> > there a compelling reason to make it bigger or smaller?) but something
>> > timebased would be very useful.
>> > ---
>> >
>> > I think the default values of buffer size are not needed to change for
>> most cases. I'm not sure for just one case: _DEFAULT_FLUSH_THRESHOLD=10MB.
>> > Will 1MB makes more sense?
>>
>> IIRC, 10MB was the point at which, according to benchmarks Luke did
>> quite a while ago, there was clearly no performance benefit in making
>> it larger. Coupled with a time-based threshold, I don't see much of an
>> advantage to lowering it.
>
>
> My concern is that the SDK harness may b

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-07-29 Thread Robert Bradshaw
---
>
> The design makes sense to me. My concern is that if a coder is not among the 
> StandardCoders, it will be prefixed with a length even if the harness knows 
> how to decode it.

If the harness knows how to decode it, the length prefixing is just a
lost optimization opportunity, but it'll still work. Whether this is a
big enough loss to merit introducing an extra protocol to negotiate on
the set of commonly known coders beyond standard coders is still TBD,
but probably not for v1 (and possibly not ever, especially as schemas
become more expressive).

> Besides, I'm also curious about the standard whether a coder can be put into 
> StandardCoders.
> For example, I noticed that FLOAT is not among StandardCoders, while DOUBLE 
> is among it.

StandardCoders is supposed to be some sort of lowest common
denominator, but theres no hard and fast criteria. For this example,
some languages (e.g. Python) don't have the notion of FLOAT, and using
a FLOAT coder for Python floats (whose underling representation is
double) gets tricky as this coder is not faithful. We also don't have
specific int coders for smaller-than-64-bit types which, like float,
are easily promoted.

> Best, Jincheng
>
> Robert Bradshaw  于2019年7月25日周四 下午2:00写道:
>>
>> On Thu, Jul 25, 2019 at 5:31 AM Thomas Weise  wrote:
>> >
>> > Hi Jincheng,
>> >
>> > It is very exciting to see this follow-up, that you have done your 
>> > research on the current state and that there is the intention to join 
>> > forces on the portability effort!
>> >
>> > I have added a few pointers inline.
>> >
>> > Several of the issues you identified affect our usage of Beam as well. 
>> > These present an opportunity for collaboration.
>>
>> +1, a lot of this aligns with improvements we'd like to make as well.
>>
>> > On Wed, Jul 24, 2019 at 2:53 AM jincheng sun  
>> > wrote:
>> >>
>> >> Hi all,
>> >>
>> >> Thanks Max and all of your kind words. :)
>> >>
>> >> Sorry for the late reply as I'm busy working on the Flink 1.9 release. 
>> >> For the next major release of Flink, we plan to add Python user defined 
>> >> functions(UDF, UDTF, UDAF) support in Flink and I have go over the Beam 
>> >> portability framework and think that it is perfect for our requirements. 
>> >> However we also find some improvements needed for Beam:
>> >>
>> >> Must Have:
>> >> 
>> >> 1) Currently only BagState is supported in gRPC protocol and I think we 
>> >> should support more kinds of state types, such as MapState, ValueState, 
>> >> ReducingState, CombiningState(AggregatingState in Flink), etc. That's 
>> >> because these kinds of state will be used in both user-defined function 
>> >> or Flink Python DataStream API.
>> >
>> > There has been discussion about the need for different state types and to 
>> > efficiently support those on the runner side there may be a need to look 
>> > at the over the wire representation also.
>> >
>> > https://lists.apache.org/thread.html/ccc0d548e440b63897b6784cd7896c266498df64c9c63ce6c52ae098@%3Cdev.beam.apache.org%3E
>> > https://lists.apache.org/thread.html/ccf8529a49003a7be622b4d3403eba2c633caeaf5ced033e25d4c2e2@%3Cdev.beam.apache.org%3E
>>
>> There are two distinct levels at which one can talk about a certain
>> type of state being supported: the user-visible SDK's API and the
>> runner's API. For example, BagState, ValueState, ReducingState,
>> CombiningState,  can all be implemented on top of a runner-offered
>> MapState in the SDK. On the one hand, there's a desire to keep the
>> number of "primitive" states types to a minimum (to ease the use of
>> authoring runners), but if a runner can perform a specific
>> optimization due to knowing about the particular state type it might
>> be preferable to pass it through rather than emulate it in the SDK.
>>
>> >> 2) There are warnings that Python 3 is not fully supported in Beam 
>> >> (beam/sdks/python/setup.py). We should support Python 3.x for the beam 
>> >> portability framework due to Python 2 will be not supported officially.
>> >
>> > This must be obsolete per latest comments on: 
>> > https://issues.apache.org/jira/browse/BEAM-1251
>> >
>> >> 3) The configuration "semi_persist_dir" is not set in EnvironmentFactory 
>> >> at the runner side. Why I think it's  must to have is because when the 
>> >> environment type is "PROCESS&

Re: Sort Merge Bucket - Action Items

2019-07-26 Thread Robert Bradshaw
On Thu, Jul 25, 2019 at 11:09 PM Eugene Kirpichov  wrote:
>
> Hi Gleb,
>
> Regarding the future of io.Read: ideally things would go as follows
> - All runners support SDF at feature parity with Read (mostly this is just 
> the Dataflow runner's liquid sharding and size estimation for bounded 
> sources, and backlog for unbounded sources, but I recall that a couple of 
> other runners also used size estimation)
> - Bounded/UnboundedSource APIs are declared "deprecated" - it is forbidden to 
> add any new implementations to SDK, and users shouldn't use them either 
> (note: I believe it's already effectively forbidden to use them for cases 
> where a DoFn/SDF at the current level of support will be sufficient)
> - People one by one rewrite existing Bounded/UnboundedSource based 
> PTransforms in the SDK to use SDFs instead
> - Read.from() is rewritten to use a wrapper SDF over the given Source, and 
> explicit support for Read is deleted from runners
> - In the next major version of Beam - presumably 3.0 - the Read transform 
> itself is deleted
>
> I don't know what's the current status of SDF/Read feature parity, maybe Luke 
> or Cham can comment. An alternative path is offered in 
> http://s.apache.org/sdf-via-source.

Python supports initial splitting for SDF of all sources on portable
runners. Dataflow support for batch SDF is undergoing testing, not yet
rolled out. Dataflow support for streaming SDF is awaiting portable
state/timer support.

> On Thu, Jul 25, 2019 at 6:39 AM Gleb Kanterov  wrote:
>>
>> What is the long-term plan for org.apache.beam.sdk.io.Read? Is it going away 
>> in favor of SDF, or we are always going to have both?
>>
>> I was looking into AvroIO.read and AvroIO.readAll, both of them use 
>> AvroSource. AvroIO.readAll is using SDF, and it's implemented with 
>> ReadAllViaFileBasedSource that takes AvroSource as a parameter. Looking at 
>> ReadAllViaFileBasedSource I find it not necessary to use Source, it 
>> should be enough to have something like (KV, 
>> OutputReceiver), as we have discussed in this thread, and that should be 
>> fine for SMB as well. It would require duplicating code from AvroSource, but 
>> in the end, I don't see it as a problem if AvroSource is going away.
>>
>> I'm attaching a small diagram I put for myself to better understand the code.
>>
>> AvroIO.readAll :: PTransform> ->
>>
>> FileIO.matchAll :: PTransform, 
>> PCollection>
>> FileIO.readMatches :: PTransform, 
>> PCollection>
>> AvroIO.readFiles :: PTransform, 
>> PCollection> ->
>>
>> ReadAllViaFileBasedSource :: PTransform, 
>> PCollection> ->
>>
>> ParDo.of(SplitIntoRangesFn :: DoFn> OffsetRange>>) (splittable do fn)
>>
>> Reshuffle.viaRandomKey()
>>
>> ParDo.of(ReadFileRangesFn(createSource) :: DoFn> OffsetRange>, T>) where
>>
>> createSource :: String -> FileBasedSource
>>
>> createSource = AvroSource
>>
>>
>> AvroIO.read without getHintMatchedManyFiles() :: PTransform> PCollection> ->
>>
>> Read.Bounded.from(createSource) where
>>
>> createSource :: String -> FileBasedSource
>>
>> createSource = AvroSource
>>
>>
>> Gleb
>>
>>
>> On Thu, Jul 25, 2019 at 2:41 PM Robert Bradshaw  wrote:
>>>
>>> On Thu, Jul 25, 2019 at 12:35 AM Kenneth Knowles  wrote:
>>> >
>>> > From the peanut gallery, keeping a separate implementation for SMB seems 
>>> > fine. Dependencies are serious liabilities for both upstream and 
>>> > downstream. It seems like the reuse angle is generating extra work, and 
>>> > potentially making already-complex implementations more complex, instead 
>>> > of helping things.
>>>
>>> +1
>>>
>>> To be clear, what I care about is that WriteFiles(X) and
>>> WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text,
>>> TFRecord, ...}. In other words composability of the API (vs. manually
>>> filling out the matrix). If WriteFiles and WriteSmbFiles find
>>> opportunities for (easy, clean) implementation sharing, that'd be
>>> nice, but not the primary goal.
>>>
>>> (Similarly for reading, though that's seem less obvious. Certainly
>>> whatever T is useful for ReadSmb(T) could be useful for a
>>> (non-liquid-shading) ReadAll(T) however.)
>>>
>>> > On Wed, Jul 24, 2019 at 11:59 AM Neville Li  wrote:
>>> >>
>>> >> I spoke too soon. Turns out for unsharded writes, numShards can't be 
>>

Re: Stateful ParDo on Non-Keyed PCollection

2019-07-25 Thread Robert Bradshaw
On Thu, Jul 25, 2019 at 6:34 PM rahul patwari
 wrote:
>
> So, If an RPC call has to be performed for a batch of Rows(PCollection), 
> instead of each Row, the recommended way is to batch the Rows in 
> startBundle() of 
> DoFn(https://stackoverflow.com/questions/49094781/yield-results-in-finish-bundle-from-a-custom-dofn/49101711#49101711)?

Yes.

> I thought Stateful and Timely Processing could be helpful here.

The upside is that you can persist state across bundles (which is
especially helpful when bundles are small, e.g. for streaming
pipelines). The downside is that you can't persist state across keys
(and it also enforces a shuffle to colocate the data by key).

If you get to choose your keys, you would want to have about as many
keys as you have concurrent bundles (or some small multiple, to ensure
they're not lumpily distributed). Keying by something like
System.identityHashCode(this) in the body of a DoFn might be
sufficient.

> On Thu, Jul 25, 2019 at 9:54 PM Robert Bradshaw  wrote:
>>
>> Though it's not obvious in the name, Stateful ParDos can only be
>> applied to keyed PCollections, similar to GroupByKey. (You could,
>> however, assign every element to the same key and then apply a
>> Stateful DoFn, though in that case all elements would get processed on
>> the same worker.)
>>
>> On Thu, Jul 25, 2019 at 6:06 PM rahul patwari
>>  wrote:
>> >
>> > Hi,
>> >
>> > https://beam.apache.org/blog/2017/02/13/stateful-processing.html  gives an 
>> > example of assigning an arbitrary-but-consistent index to each element on 
>> > a per key-and-window basis.
>> >
>> > If the Stateful ParDo is applied on a Non-Keyed PCollection, say, 
>> > PCollection with Fixed Windows, the state is maintained per window 
>> > and every element in the window will be assigned a consistent index?
>> > Does this mean every element belonging to the window will be processed in 
>> > a single DoFn Instance, which otherwise could have been done in multiple 
>> > parallel instances, limiting performance?
>> > Similarly, How does Stateful ParDo behave on Bounded Non-Keyed PCollection?
>> >
>> > Thanks,
>> > Rahul


Re: Stateful ParDo on Non-Keyed PCollection

2019-07-25 Thread Robert Bradshaw
Though it's not obvious in the name, Stateful ParDos can only be
applied to keyed PCollections, similar to GroupByKey. (You could,
however, assign every element to the same key and then apply a
Stateful DoFn, though in that case all elements would get processed on
the same worker.)

On Thu, Jul 25, 2019 at 6:06 PM rahul patwari
 wrote:
>
> Hi,
>
> https://beam.apache.org/blog/2017/02/13/stateful-processing.html  gives an 
> example of assigning an arbitrary-but-consistent index to each element on a 
> per key-and-window basis.
>
> If the Stateful ParDo is applied on a Non-Keyed PCollection, say, 
> PCollection with Fixed Windows, the state is maintained per window and 
> every element in the window will be assigned a consistent index?
> Does this mean every element belonging to the window will be processed in a 
> single DoFn Instance, which otherwise could have been done in multiple 
> parallel instances, limiting performance?
> Similarly, How does Stateful ParDo behave on Bounded Non-Keyed PCollection?
>
> Thanks,
> Rahul


Re: Sort Merge Bucket - Action Items

2019-07-25 Thread Robert Bradshaw
On Thu, Jul 25, 2019 at 12:35 AM Kenneth Knowles  wrote:
>
> From the peanut gallery, keeping a separate implementation for SMB seems 
> fine. Dependencies are serious liabilities for both upstream and downstream. 
> It seems like the reuse angle is generating extra work, and potentially 
> making already-complex implementations more complex, instead of helping 
> things.

+1

To be clear, what I care about is that WriteFiles(X) and
WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text,
TFRecord, ...}. In other words composability of the API (vs. manually
filling out the matrix). If WriteFiles and WriteSmbFiles find
opportunities for (easy, clean) implementation sharing, that'd be
nice, but not the primary goal.

(Similarly for reading, though that's seem less obvious. Certainly
whatever T is useful for ReadSmb(T) could be useful for a
(non-liquid-shading) ReadAll(T) however.)

> On Wed, Jul 24, 2019 at 11:59 AM Neville Li  wrote:
>>
>> I spoke too soon. Turns out for unsharded writes, numShards can't be 
>> determined until the last finalize transform, which is again different from 
>> the current SMB proposal (static number of buckets & shards).
>> I'll end up with more code specialized for SMB in order to generalize 
>> existing sink code, which I think we all want to avoid.
>>
>> Seems the only option is duplicating some logic like temp file handling, 
>> which is exactly what we did in the original PR.
>> I can reuse Compression & Sink for file level writes but that seems about 
>> the most I can reuse right now.
>>
>> On Tue, Jul 23, 2019 at 6:36 PM Neville Li  wrote:
>>>
>>> So I spent one afternoon trying some ideas for reusing the last few 
>>> transforms WriteFiles.
>>>
>>> WriteShardsIntoTempFilesFn extends DoFn, 
>>> Iterable>, FileResult>
>>> => GatherResults extends PTransform, 
>>> PCollection>>
>>> => FinalizeTempFileBundles extends 
>>> PTransform>>, 
>>> WriteFilesResult>
>>>
>>> I replaced FileResult with KV so I 
>>> can use pre-compute SMB destination file names for the transforms.
>>> I'm also thinking of parameterizing ShardedKey for SMB's 
>>> bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are 
>>> private and easy to change/pull out.
>>>
>>> OTOH they are somewhat coupled with the package private 
>>> {Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk of 
>>> temp file handing logic lives). Might be hard to decouple either modifying 
>>> existing code or creating new transforms, unless if we re-write most of 
>>> FileBasedSink from scratch.
>>>
>>> Let me know if I'm on the wrong track.
>>>
>>> WIP Branch https://github.com/spotify/beam/tree/neville/write-files
>>>
>>> On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath  
>>> wrote:
>>>>
>>>>
>>>>
>>>> On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw  
>>>> wrote:
>>>>>
>>>>> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov  
>>>>> wrote:
>>>>> >
>>>>> > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw  
>>>>> > wrote:
>>>>> >>
>>>>> >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li  
>>>>> >> wrote:
>>>>> >> >
>>>>> >> > Thanks Robert. Agree with the FileIO point. I'll look into it and 
>>>>> >> > see what needs to be done.
>>>>> >> >
>>>>> >> > Eugene pointed out that we shouldn't build on 
>>>>> >> > FileBased{Source,Sink}. So for writes I'll probably build on top of 
>>>>> >> > WriteFiles.
>>>>> >>
>>>>> >> Meaning it could be parameterized by FileIO.Sink, right?
>>>>> >>
>>>>> >> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779
>>>>> >
>>>>> > Yeah if possible, parameterize FileIO.Sink.
>>>>> > I would recommend against building on top of WriteFiles either. FileIO 
>>>>> > being implemented on top of WriteFiles was supposed to be a temporary 
>>>>> > measure - the longer-term plan was to rewrite it from scratch (albeit 
>>>>> > with a similar structure) and throw away WriteFiles.
>>>>> > If possible, I would recomm

Re: Write-through-cache in State logic

2019-07-25 Thread Robert Bradshaw
On Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar  wrote:
>
> Thanks Robert,
>
>  I stumble on the jira that you have created some time ago
> https://jira.apache.org/jira/browse/BEAM-5428
>
> You also marked code where code changes are required:
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465
>
> I am willing to provide help to implement this. Let me know how I can help.

As far as I'm aware, no one is actively working on it right now.
Please feel free to assign yourself the JIRA entry and I'll be happy
to answer any questions you might have if (well probably when) these
pointers are insufficient.

> On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw  wrote:
>>
>> This is documented at
>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
>> . Note that it requires participation of both the runner and the SDK
>> (though there are no correctness issues if one or the other side does
>> not understand the protocol, caching just won't be used).
>>
>> I don't think it's been implemented anywhere, but could be very
>> beneficial for performance.
>>
>> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar  wrote:
>> >
>> > I checked the python sdk[1] and it has similar implementation as Java SDK.
>> >
>> > I would agree with Thomas. In case of high volume event stream and bigger 
>> > cluster size, network call can potentially cause a bottleneck.
>> >
>> > @Robert
>> > I am interested to see the proposal. Can you provide me the link of the 
>> > proposal?
>> >
>> > [1]: 
>> > https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
>> >
>> >
>> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise  wrote:
>> >>
>> >> Thanks for the pointer. For streaming, it will be important to support 
>> >> caching across bundles. It appears that even the Java SDK doesn't support 
>> >> that yet?
>> >>
>> >> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
>> >>
>> >> Regarding clear/append: It would be nice if both could occur within a 
>> >> single Fn Api roundtrip when the state is persisted.
>> >>
>> >> Thanks,
>> >> Thomas
>> >>
>> >>
>> >>
>> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik  wrote:
>> >>>
>> >>> User state is built on top of read, append and clear and not off a read 
>> >>> and write paradigm to allow for blind appends.
>> >>>
>> >>> The optimization you speak of can be done completely inside the SDK 
>> >>> without any additional protocol being required as long as you clear the 
>> >>> state first and then append all your new data. The Beam Java SDK does 
>> >>> this for all runners when executed portably[1]. You could port the same 
>> >>> logic to the Beam Python SDK as well.
>> >>>
>> >>> 1: 
>> >>> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
>> >>>
>> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw  
>> >>> wrote:
>> >>>>
>> >>>> Python workers also have a per-bundle SDK-side cache. A protocol has
>> >>>> been proposed, but hasn't yet been implemented in any SDKs or runners.
>> >>>>
>> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax  wrote:
>> >>>> >
>> >>>> > It's runner dependent. Some runners (e.g. the Dataflow runner) do 
>> >>>> > have such a cache, though I think it's currently has a cap for large 
>> >>>> > bags.
>> >>>> >
>> >>>> > Reuven
>> >>>> >
>> >>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar  
>> >>>> > wrote:
>> >>>> >>
>> >>>> >> Hi,
>> >>>> >>
>> >>>> >> I have been using python sdk for the application and also using 
>> >>>> >> BagState in production. I was wondering whether state logic has any 
>> >>>> >> write-through-cache implemented or not. If we are sending every read 
>> >>>> >> and write request through network then it comes with a performance 
>> >>>> >> cost. We can avoid network call for a read operation if we have 
>> >>>> >> write-through-cache.
>> >>>> >> I have superficially looked into the implementation and I didn't see 
>> >>>> >> any cache implementation.
>> >>>> >>
>> >>>> >> is it possible to have this cache? would it cause any issue if we 
>> >>>> >> have the caching layer?
>> >>>> >>


Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-07-25 Thread Robert Bradshaw
On Thu, Jul 25, 2019 at 5:31 AM Thomas Weise  wrote:
>
> Hi Jincheng,
>
> It is very exciting to see this follow-up, that you have done your research 
> on the current state and that there is the intention to join forces on the 
> portability effort!
>
> I have added a few pointers inline.
>
> Several of the issues you identified affect our usage of Beam as well. These 
> present an opportunity for collaboration.

+1, a lot of this aligns with improvements we'd like to make as well.

> On Wed, Jul 24, 2019 at 2:53 AM jincheng sun  wrote:
>>
>> Hi all,
>>
>> Thanks Max and all of your kind words. :)
>>
>> Sorry for the late reply as I'm busy working on the Flink 1.9 release. For 
>> the next major release of Flink, we plan to add Python user defined 
>> functions(UDF, UDTF, UDAF) support in Flink and I have go over the Beam 
>> portability framework and think that it is perfect for our requirements. 
>> However we also find some improvements needed for Beam:
>>
>> Must Have:
>> 
>> 1) Currently only BagState is supported in gRPC protocol and I think we 
>> should support more kinds of state types, such as MapState, ValueState, 
>> ReducingState, CombiningState(AggregatingState in Flink), etc. That's 
>> because these kinds of state will be used in both user-defined function or 
>> Flink Python DataStream API.
>
> There has been discussion about the need for different state types and to 
> efficiently support those on the runner side there may be a need to look at 
> the over the wire representation also.
>
> https://lists.apache.org/thread.html/ccc0d548e440b63897b6784cd7896c266498df64c9c63ce6c52ae098@%3Cdev.beam.apache.org%3E
> https://lists.apache.org/thread.html/ccf8529a49003a7be622b4d3403eba2c633caeaf5ced033e25d4c2e2@%3Cdev.beam.apache.org%3E

There are two distinct levels at which one can talk about a certain
type of state being supported: the user-visible SDK's API and the
runner's API. For example, BagState, ValueState, ReducingState,
CombiningState,  can all be implemented on top of a runner-offered
MapState in the SDK. On the one hand, there's a desire to keep the
number of "primitive" states types to a minimum (to ease the use of
authoring runners), but if a runner can perform a specific
optimization due to knowing about the particular state type it might
be preferable to pass it through rather than emulate it in the SDK.

>> 2) There are warnings that Python 3 is not fully supported in Beam 
>> (beam/sdks/python/setup.py). We should support Python 3.x for the beam 
>> portability framework due to Python 2 will be not supported officially.
>
> This must be obsolete per latest comments on: 
> https://issues.apache.org/jira/browse/BEAM-1251
>
>> 3) The configuration "semi_persist_dir" is not set in EnvironmentFactory at 
>> the runner side. Why I think it's  must to have is because when the 
>> environment type is "PROCESS", the default value "/tmp" may become a big 
>> problem.

There are still some issues to be worked out around exactly how
environments are set up (most notably around dependencies that are
external to the docker images, but also things like this).

>> 4) The buffer size configure policy should be improved, such as:
>>At runner side, the buffer limit in BeamFnDataBufferingOutboundObserver 
>> is size based. We should also support time based especially for the 
>> streaming case.
>>At Python SDK Harness, the buffer size is not configurable in 
>> GrpcDataService. The input queue size of the input buffer in Python SDK 
>> Harness is not size limited.
>>   The flush threshold of the output buffer in Python SDK Harness is 10 MB by 
>> default (_DEFAULT_FLUSH_THRESHOLD=10MB). My suggestion is: make the 
>> threshold configurable and support time based threshold.

I'm wary of having too many buffer size configuration options (is
there a compelling reason to make it bigger or smaller?) but something
timebased would be very useful.

>> Nice To Have:
>> ---
>> 1) Improves the interfaces of FnDataService, BundleProcessor, ActiveBundle, 
>> etc, to change the parameter type from WindowedValue to T. (We have 
>> already discussed in the previous mails)
>>
>> 2) Refactor the code to avoid unnecessary dependencies pull in. For example, 
>> beam-sdks-java-core(11MB) is a package for Java SDK users and it is pull in 
>> because there are a few classes in beam-sdks-java-core are used in 
>> beam-runners-java-fn-execution, such as:
>> PipelineOptions used in DefaultJobBundleFactory FileSystems used in 
>> BeamFileSystemArtifactRetrievalService.
>> It means maybe we can add a new module such as beam-sdks-java-common to hold 
>> the classes used by both runner and SDK.
>>
>> 3) State cache is not shared between bundles which is performance critical 
>> for streaming jobs.
>
> This is rather important to address:
>
> https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E
>
>>
>>
>> 4) The coder of 

Re: How to expose/use the External transform on Java SDK

2019-07-25 Thread Robert Bradshaw
>From the portability perspective,
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto
and the associated services for executing pipelines is about as "core"
as it gets, and eventually I'd like to see all runners being portable
(even if they have an option of running user code in process rather
than in separate docker images) and the API between SDKs and Runners
would be these beam model protos rather than some parallel set of Java
classes. This would argue for #1. (There was also some discussion
recently about merging the transform translation into core as well, as
the current structure of keeping it separate introduces a lot of extra
hoops and makes it difficult to define user-level transforms that have
proper translation, which is along the same lines.)

I'm not quite sure I follow the downsides of leaking the vendored
classes into the users classpath--isn't the point of vendoring to make
such exposure benign (and as you'd almost always be linking in a
runner, you'd get this anyway).

Finally, from a simple user's API perspective, having
ExternalTransform in core makes a lot of sense and it'd be unfortunate
to contort the API for underlying technical reasons if it can be
avoided.

On Wed, Jul 24, 2019 at 9:18 PM Heejong Lee  wrote:
>
> I think it depends how we define "the core" part of the SDK. If we define the 
> core as only the (abstract) data types which describe BEAM pipeline model 
> then it would be more sensible to put external transform into a separate 
> extension module (option 4). Otherwise, option 1 makes sense.
>
> On Wed, Jul 24, 2019 at 11:56 AM Chamikara Jayalath  
> wrote:
>>
>> The idea of 'ExternalTransform' is to allow users to use transforms in SDK X 
>> from SDK Y. I think this should be a core part of each SDK and corresponding 
>> external transforms ([a] for Java, [b] for Python) should be released with 
>> each SDK. This will also allow us to add core external transforms to some of 
>> the critical transforms that are not available in certain SDKs. So I prefer 
>> option (1).
>>
>> Rebo, I didn't realize there's an external transform in Go SDK. Looking at 
>> it, seems like it's more of an interface for native transforms implemented 
>> in each runner, not for cross-language use-cases. Is that correct ? May be 
>> we can reuse it for latter as well.
>>
>> Thanks,
>> Cham
>>
>> [a] 
>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java
>> [b] 
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py
>>
>> On Wed, Jul 24, 2019 at 10:25 AM Robert Burke  wrote:
>>>
>>> Ideas inline.
>>>
>>> On Wed, Jul 24, 2019, 9:56 AM Ismaël Mejía  wrote:

 After Beam Summit EU I was curious about the External transform. I was
 interested on the scenario of using it to call python code in the
 middle of a Java pipeline. This is a potentially useful scenario for
 example to evaluate models from python ML frameworks on Java
 pipelines. In my example I did a transform to classify elements in a
 simple Python ParDo and tried to connect it via the Java External
 transform.

 I found that the ExternalTransform code was added into
 `runners/core-construction-java` as part of BEAM-6747 [1]. However
 this code is not exposed currently as part of the Beam Java SDK, so
 end users won’t be able to find it easily. I found this weird and
 thought well it will be as simple as to move it into the Java SDK and
 voila!

 But of course this could not be so easy because this transform calls
 the Expansion service via gRPC and Java SDK does not have (and
 probably should not have) gRPC in its dependencies.
 So my second reflex was to add it into Java SDK and translate it a
 generic expansion all the runners, but this may not make sense because
 the External transform is not part of the runner translation since
 this is part of the Pipeline construction process (as pointed to me by
 Max in a slack discussion).

 So the question is: How do you think this should be exposed to the end 
 users?

 1. Should we add gRPC with all its deps to SDKs Java core? (this of
 course it is not nice because we will leak our vendored gRPC and
 friends into users classpath).
>>>
>>> If there's separation between the SDK and the Harness then this makes 
>>> sense. Otherwise the portable harness depends on GRPC at present, doesn't 
>>> it? Presently the Go SDK kicks off the harness, and then carries the GRPC 
>>> dependency (Though that's separable if necessary.)

 2. Should we do the dynamic loading of classes only an runtime if the
 transform is used to avoid the big extra compile dependency (and add
 runners/core-construction-java) as a runtime dependency.
 3. Should we create a ‘shim’ module to hide the gRPC dependency and

Re: On Auto-creating GCS buckets on behalf of users

2019-07-23 Thread Robert Bradshaw
On Tue, Jul 23, 2019 at 10:26 PM Chamikara Jayalath
 wrote:
>
> On Tue, Jul 23, 2019 at 1:10 PM Kyle Weaver  wrote:
>>
>> I agree with David that at least clearer log statements should be added.
>>
>> Udi, that's an interesting idea, but I imagine the sheer number of existing 
>> flags (including many SDK-specific flags) would make it difficult to 
>> implement. In addition, uniform argument names wouldn't necessarily ensure 
>> uniform implementation.
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>>
>>
>> On Tue, Jul 23, 2019 at 11:56 AM Udi Meiri  wrote:
>>>
>>> Java SDK creates one regional bucket per project and region combination.
>>> So it's not a lot of buckets - no need to auto-clean.
>
>
> Agree that cleanup is not a bit issue if we are only creating a single bucket 
> per project and region. I assume we are creating temporary folders for each 
> pipeline with the same region and project so that they don't conclifc (which 
> we clean up).
> As others mentioned we should clearly document this (including the naming of 
> the bucket) and produce a log during pipeline creating.
>
>>>
>>>
>>> I agree with Robert that having less flags is better.
>>> Perhaps what we need a unifying interface for SDKs that simplifies 
>>> launching?
>>>
>>> So instead of:
>>> mvn compile exec:java -Dexec.mainClass= 
>>> -Dexec.args="--runner=DataflowRunner --project= 
>>> --gcpTempLocation=gs:///tmp " -Pdataflow-runner
>>> or
>>> python -m  --runner DataflowRunner --project  
>>> --temp_location gs:///tmp/ 
>
> Interesting, probably this should be extended to a generalized CLI for Beam 
> that can be easily installed to execute Beam pipelines ?

This is starting to get somewhat off-topic from the original question,
but I'm not sure the benefits of providing a wrapper to the end user
would outweigh the costs of having to learn the wrapper. For Python
developers, python -m module, or even python -m path/to/script.py is
pretty standard. Java is a bit harder, because one needs to coordinate
a build as well, but I don't know how a "./beam java ..." script would
gloss over whether one is using maven, gradle, ant, or just has a pile
of pre-compiled jara (and would probably have to know a bit about the
project layout as well to invoke the right commands).


Re: Write-through-cache in State logic

2019-07-23 Thread Robert Bradshaw
This is documented at
https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
. Note that it requires participation of both the runner and the SDK
(though there are no correctness issues if one or the other side does
not understand the protocol, caching just won't be used).

I don't think it's been implemented anywhere, but could be very
beneficial for performance.

On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar  wrote:
>
> I checked the python sdk[1] and it has similar implementation as Java SDK.
>
> I would agree with Thomas. In case of high volume event stream and bigger 
> cluster size, network call can potentially cause a bottleneck.
>
> @Robert
> I am interested to see the proposal. Can you provide me the link of the 
> proposal?
>
> [1]: 
> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
>
>
> On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise  wrote:
>>
>> Thanks for the pointer. For streaming, it will be important to support 
>> caching across bundles. It appears that even the Java SDK doesn't support 
>> that yet?
>>
>> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
>>
>> Regarding clear/append: It would be nice if both could occur within a single 
>> Fn Api roundtrip when the state is persisted.
>>
>> Thanks,
>> Thomas
>>
>>
>>
>> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik  wrote:
>>>
>>> User state is built on top of read, append and clear and not off a read and 
>>> write paradigm to allow for blind appends.
>>>
>>> The optimization you speak of can be done completely inside the SDK without 
>>> any additional protocol being required as long as you clear the state first 
>>> and then append all your new data. The Beam Java SDK does this for all 
>>> runners when executed portably[1]. You could port the same logic to the 
>>> Beam Python SDK as well.
>>>
>>> 1: 
>>> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
>>>
>>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw  wrote:
>>>>
>>>> Python workers also have a per-bundle SDK-side cache. A protocol has
>>>> been proposed, but hasn't yet been implemented in any SDKs or runners.
>>>>
>>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax  wrote:
>>>> >
>>>> > It's runner dependent. Some runners (e.g. the Dataflow runner) do have 
>>>> > such a cache, though I think it's currently has a cap for large bags.
>>>> >
>>>> > Reuven
>>>> >
>>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar  
>>>> > wrote:
>>>> >>
>>>> >> Hi,
>>>> >>
>>>> >> I have been using python sdk for the application and also using 
>>>> >> BagState in production. I was wondering whether state logic has any 
>>>> >> write-through-cache implemented or not. If we are sending every read 
>>>> >> and write request through network then it comes with a performance 
>>>> >> cost. We can avoid network call for a read operation if we have 
>>>> >> write-through-cache.
>>>> >> I have superficially looked into the implementation and I didn't see 
>>>> >> any cache implementation.
>>>> >>
>>>> >> is it possible to have this cache? would it cause any issue if we have 
>>>> >> the caching layer?
>>>> >>


Re: On Auto-creating GCS buckets on behalf of users

2019-07-23 Thread Robert Bradshaw
I think having a single, default, auto-created temporary bucket per
project for use in GCP (when running on Dataflow, or running elsewhere
but using GCS such as for this BQ load files example), though not
ideal, is the best user experience. If we don't want to be
automatically creating such things for users by default, another
option would be a single flag that opts-in to such auto-creation
(which could include other resources in the future).

On Tue, Jul 23, 2019 at 1:08 AM Pablo Estrada  wrote:
>
> Hello all,
> I recently worked on a transform to load data into BigQuery by writing files 
> to GCS, and issuing Load File jobs to BQ. I did this for the Python SDK[1].
>
> This option requires the user to provide a GCS bucket to write the files:
>
> If the user provides a bucket to the transform, the SDK will use that bucket.
> If the user does not provide a bucket:
>
> When running in Dataflow, the SDK will borrow the temp_location of the 
> pipeline.
> When running in other runners, the pipeline will fail.
>
> The Java SDK has had functionality for File Loads into BQ for a long time; 
> and particularly, when users do not provide a bucket, it attempts to create a 
> default bucket[2]; and this bucket is used as temp_location (which then is 
> used by the BQ File Loads transform).
>
> I do not really like creating GCS buckets on behalf of users. In Java, the 
> outcome is that users will not have to pass a --tempLocation parameter when 
> submitting jobs to Dataflow - which is a nice convenience, but I'm not sure 
> that this is in-line with users' expectations.
>
> Currently, the options are:
>
> Adding support for bucket autocreation for Python SDK
> Deprecating support for bucket autocreation in Java SDK, and printing a 
> warning.
>
> I am personally inclined for #1. But what do others think?
>
> Best
> -P.
>
> [1] https://github.com/apache/beam/pull/7892
> [2] 
> https://github.com/apache/beam/blob/5b3807be717277e3e6880a760b036fecec3bc95d/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java#L294-L343


Re: Sort Merge Bucket - Action Items

2019-07-22 Thread Robert Bradshaw
On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov  wrote:
>
> On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw  wrote:
>>
>> On Mon, Jul 22, 2019 at 4:04 PM Neville Li  wrote:
>> >
>> > Thanks Robert. Agree with the FileIO point. I'll look into it and see what 
>> > needs to be done.
>> >
>> > Eugene pointed out that we shouldn't build on FileBased{Source,Sink}. So 
>> > for writes I'll probably build on top of WriteFiles.
>>
>> Meaning it could be parameterized by FileIO.Sink, right?
>>
>> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779
>
> Yeah if possible, parameterize FileIO.Sink.
> I would recommend against building on top of WriteFiles either. FileIO being 
> implemented on top of WriteFiles was supposed to be a temporary measure - the 
> longer-term plan was to rewrite it from scratch (albeit with a similar 
> structure) and throw away WriteFiles.
> If possible, I would recommend to pursue this path: if there are parts of 
> WriteFiles you want to reuse, I would recommend to implement them as new 
> transforms, not at all tied to FileBasedSink (but ok if tied to FileIO.Sink), 
> with the goal in mind that FileIO could be rewritten on top of these new 
> transforms, or maybe parts of WriteFiles could be swapped out for them 
> incrementally.

Thanks for the feedback. There's a lot that was done, but looking at
the code it feels like there's a lot that was not yet done either, and
the longer-term plan wasn't clear (though perhaps I'm just not finding
the right docs).

>> > Read might be a bigger change w.r.t. collocating ordered elements across 
>> > files within a bucket and TBH I'm not even sure where to start.
>>
>> Yeah, here we need an interface that gives us ReadableFile ->
>> Iterable. There are existing PTransform,
>> PCollection> but such an interface is insufficient to extract
>> ordered records per shard. It seems the only concrete implementations
>> are based on FileBasedSource, which we'd like to avoid, but there's no
>> alternative. An SDF, if exposed, would likely be overkill and
>> cumbersome to call (given the reflection machinery involved in
>> invoking DoFns).
>
> Seems easiest to just define a new regular Java interface for this.
> Could be either, indeed, ReadableFile -> Iterable, or something analogous, 
> e.g. (ReadableFile, OutputReceiver) -> void. Depends on how much control 
> over iteration you need.

For this application, one wants to iterate over several files in
parallel. The downside of a new interface is that it shares almost
nothing with the "normal" sources (e.g. when features (or
optimizations) get added to one, they won't get added to the other).

> And yes, DoFn's including SDF's are not designed to be used as Java 
> interfaces per se. If you need DoFn machinery in this interface (e.g. side 
> inputs), use Contextful - s.apache.org/context-fn.

Yeah, one of the primary downsides to the NewDoFns is how hard it is
to build new DoFns out of others (or, really, use them in any context
other than as an argument to ParDo).

>> > I'll file separate PRs for core changes needed for discussion. WDYT?
>>
>> Sounds good.
>>
>> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw  
>> > wrote:
>> >>
>> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li  wrote:
>> >> >
>> >> > Forking this thread to discuss action items regarding the change. We 
>> >> > can keep technical discussion in the original thread.
>> >> >
>> >> > Background: our SMB POC showed promising performance & cost saving 
>> >> > improvements and we'd like to adopt it for production soon (by EOY). We 
>> >> > want to contribute it to Beam so it's better generalized and 
>> >> > maintained. We also want to avoid divergence between our internal 
>> >> > version and the PR while it's in progress, specifically any breaking 
>> >> > change in the produced SMB data.
>> >>
>> >> All good goals.
>> >>
>> >> > To achieve that I'd like to propose a few action items.
>> >> >
>> >> > 1. Reach a consensus about bucket and shard strategy, key handling, 
>> >> > bucket file and metadata format, etc., anything that affect produced 
>> >> > SMB data.
>> >> > 2. Revise the existing PR according to #1
>> >> > 3. Reduce duplicate file IO logic by reusing FileIO.Sink, Compression, 
>> >> > etc., but keep the existing file level abstraction
>> &g

Re: Sort Merge Bucket - Action Items

2019-07-22 Thread Robert Bradshaw
On Mon, Jul 22, 2019 at 4:04 PM Neville Li  wrote:
>
> Thanks Robert. Agree with the FileIO point. I'll look into it and see what 
> needs to be done.
>
> Eugene pointed out that we shouldn't build on FileBased{Source,Sink}. So for 
> writes I'll probably build on top of WriteFiles.

Meaning it could be parameterized by FileIO.Sink, right?

https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779

> Read might be a bigger change w.r.t. collocating ordered elements across 
> files within a bucket and TBH I'm not even sure where to start.

Yeah, here we need an interface that gives us ReadableFile ->
Iterable. There are existing PTransform,
PCollection> but such an interface is insufficient to extract
ordered records per shard. It seems the only concrete implementations
are based on FileBasedSource, which we'd like to avoid, but there's no
alternative. An SDF, if exposed, would likely be overkill and
cumbersome to call (given the reflection machinery involved in
invoking DoFns).

> I'll file separate PRs for core changes needed for discussion. WDYT?

Sounds good.

> On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw  wrote:
>>
>> On Fri, Jul 19, 2019 at 5:16 PM Neville Li  wrote:
>> >
>> > Forking this thread to discuss action items regarding the change. We can 
>> > keep technical discussion in the original thread.
>> >
>> > Background: our SMB POC showed promising performance & cost saving 
>> > improvements and we'd like to adopt it for production soon (by EOY). We 
>> > want to contribute it to Beam so it's better generalized and maintained. 
>> > We also want to avoid divergence between our internal version and the PR 
>> > while it's in progress, specifically any breaking change in the produced 
>> > SMB data.
>>
>> All good goals.
>>
>> > To achieve that I'd like to propose a few action items.
>> >
>> > 1. Reach a consensus about bucket and shard strategy, key handling, bucket 
>> > file and metadata format, etc., anything that affect produced SMB data.
>> > 2. Revise the existing PR according to #1
>> > 3. Reduce duplicate file IO logic by reusing FileIO.Sink, Compression, 
>> > etc., but keep the existing file level abstraction
>> > 4. (Optional) Merge code into extensions::smb but mark clearly as 
>> > @experimental
>> > 5. Incorporate ideas from the discussion, e.g. ShardingFn, 
>> > GroupByKeyAndSortValues, FileIO generalization, key URN, etc.
>> >
>> > #1-4 gives us something usable in the short term, while #1 guarantees that 
>> > production data produced today are usable when #5 lands on master. #4 also 
>> > gives early adopters a chance to give feedback.
>> > Due to the scope of #5, it might take much longer and a couple of big PRs 
>> > to achieve, which we can keep iterating on.
>> >
>> > What are your thoughts on this?
>>
>> I would like to see some resolution on the FileIO abstractions before
>> merging into experimental. (We have a FileBasedSink that would mostly
>> already work, so it's a matter of coming up with an analogous Source
>> interface.) Specifically I would not want to merge a set of per file
>> type smb IOs without a path forward to this or the determination that
>> it's not possible/desirable.


Re: python precommits failing at head

2019-07-22 Thread Robert Bradshaw
This was due to a bad release artifact push. This has now been fixed upstream.

On Mon, Jul 22, 2019 at 11:00 AM Robert Bradshaw  wrote:
>
> Looks like https://sourceforge.net/p/docutils/bugs/365/
>
> On Sun, Jul 21, 2019 at 11:56 PM Tanay Tummalapalli  
> wrote:
>>
>> Hi everyone,
>>
>> The Python PreCommit from the Jenkins job "beam_PreCommit_Python_Cron" is 
>> failing[1]. The task :sdks:python:docs is failing with this traceback:
>>
>> Traceback (most recent call last):
>> File 
>> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/build/srcs/sdks/python/target/.tox-docs/docs/bin/sphinx-apidoc",
>>  line 8, in 
>> from sphinx.apidoc import main
>> File 
>> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/build/srcs/sdks/python/target/.tox-docs/docs/local/lib/python2.7/site-packages/sphinx/apidoc.py",
>>  line 27, in 
>> from sphinx.quickstart import EXTENSIONS
>> File 
>> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/build/srcs/sdks/python/target/.tox-docs/docs/local/lib/python2.7/site-packages/sphinx/quickstart.py",
>>  line 35, in 
>> from docutils.utils import column_width
>> File 
>> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/build/srcs/sdks/python/target/.tox-docs/docs/local/lib/python2.7/site-packages/docutils/utils/__init__.py",
>>  line 21, in 
>> import docutils.io
>> File 
>> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/build/srcs/sdks/python/target/.tox-docs/docs/local/lib/python2.7/site-packages/docutils/io.py",
>>  line 348
>> (self.destination.mode, mode)), file=self._stderr)
>> ^
>> SyntaxError: invalid syntax
>>
>> It seems to be due to a bug in docutils which is a dependency of sphinx.
>> Unfortunately, I don't have much context about this. I'd love to help with 
>> fixing this.
>>
>> There is another failure in test_wordcount_it of task 
>> :sdks:python:test-suites:dataflow:preCommitIT, but it seems to be a flaky 
>> test.
>>
>> [1] 
>> https://scans.gradle.com/s/ygctjiqxcz6qa/console-log?task=:sdks:python:docs
>>
>> On Mon, Jul 15, 2019 at 11:16 PM Yifan Zou  wrote:
>>>
>>> We saw similar timeouts of the python precommit and it usually break the 
>>> Jenkins build workers. I've run the precommit manually several times. It 
>>> sometimes stuck at :sdks:python:docs and consumes 80G+ memory. Our build 
>>> VMs eventually ran out of memory (104G memory available in total) then 
>>> disconnected. Not sure what happened during that job.
>>>
>>>
>>>
>>> On Sat, Jul 13, 2019 at 10:27 PM Tanay Tummalapalli  
>>> wrote:
>>>>
>>>> Yes. It passed on the second attempt.
>>>>
>>>> But, I'm yet to figure out why it hangs for ~1.5 hours.
>>>>
>>>> On Sun, Jul 14, 2019 at 10:36 AM Rakesh Kumar  wrote:
>>>>>
>>>>>
>>>>>
>>>>> Even I am running into the same issue. Though my test passed but somehow 
>>>>> the task didn't terminate, eventually the task was aborted.  I have 
>>>>> already tried a couple of times to retrigger tye python precommit but it 
>>>>> failed every time.
>>>>>
>>>>> @Tanay did it pass it for you?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jul 12, 2019 at 2:22 PM Tanay Tummalapalli  
>>>>> wrote:
>>>>>>
>>>>>> Thank You Valentyn!
>>>>>>
>>>>>> I'll retest it.
>>>>>> Hopefully, it's a transient issue.
>>>>>>
>>>>>> Regards,
>>>>>> - Tanay Tummalapalli
>>>>>>
>>>>>> On Sat, Jul 13, 2019 at 2:39 AM Valentyn Tymofieiev 
>>>>>>  wrote:
>>>>>>>
>>>>>>> No, we did not reduce the timeout recently. Looking at console logs, 
>>>>>>> nothing happened for an hour or so,
>>>>>>>
>>>>>>> 06:57:50   py27-cython: commands succeeded
>>>>>>> 06:57:50   congratulations :)
>>>>>>> 06:57:50
>>>>>>>
>>>>>>> 06:57:50 > Task :sdks:python:preCommitPy2
>>>>>>> 08:22:33 Build timed out (after 

Re: python precommits failing at head

2019-07-22 Thread Robert Bradshaw
Looks like https://sourceforge.net/p/docutils/bugs/365/

On Sun, Jul 21, 2019 at 11:56 PM Tanay Tummalapalli 
wrote:

> Hi everyone,
>
> The Python PreCommit from the Jenkins job "beam_PreCommit_Python_Cron" is
> failing[1]. The task :sdks:python:docs is failing with this traceback:
>
> Traceback (most recent call last):
> File
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/build/srcs/sdks/python/target/.tox-docs/docs/bin/sphinx-apidoc",
> line 8, in 
> from sphinx.apidoc import main
> File
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/build/srcs/sdks/python/target/.tox-docs/docs/local/lib/python2.7/site-packages/sphinx/apidoc.py",
> line 27, in 
> from sphinx.quickstart import EXTENSIONS
> File
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/build/srcs/sdks/python/target/.tox-docs/docs/local/lib/python2.7/site-packages/sphinx/quickstart.py",
> line 35, in 
> from docutils.utils import column_width
> File
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/build/srcs/sdks/python/target/.tox-docs/docs/local/lib/python2.7/site-packages/docutils/utils/__init__.py",
> line 21, in 
> import docutils.io
> File
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/build/srcs/sdks/python/target/.tox-docs/docs/local/lib/python2.7/site-packages/docutils/io.py",
> line 348
> (self.destination.mode, mode)), file=self._stderr)
> ^
> SyntaxError: invalid syntax
>
> It seems to be due to a bug in docutils which is a dependency of sphinx.
> Unfortunately, I don't have much context about this. I'd love to help with
> fixing this.
>
> There is another failure in test_wordcount_it of task 
> :sdks:python:test-suites:dataflow:preCommitIT,
> but it seems to be a flaky test.
>
> [1]
> https://scans.gradle.com/s/ygctjiqxcz6qa/console-log?task=:sdks:python:docs
>
> On Mon, Jul 15, 2019 at 11:16 PM Yifan Zou  wrote:
>
>> We saw similar timeouts of the python precommit and it usually break the
>> Jenkins build workers. I've run the precommit manually several times. It
>> sometimes stuck at :sdks:python:docs and consumes 80G+ memory. Our build
>> VMs eventually ran out of memory (104G memory available in total) then
>> disconnected. Not sure what happened during that job.
>>
>>
>>
>> On Sat, Jul 13, 2019 at 10:27 PM Tanay Tummalapalli 
>> wrote:
>>
>>> Yes. It passed on the second attempt.
>>>
>>> But, I'm yet to figure out why it hangs for ~1.5 hours.
>>>
>>> On Sun, Jul 14, 2019 at 10:36 AM Rakesh Kumar 
>>> wrote:
>>>


 Even I am running into the same issue. Though my test passed but
 somehow
 the task didn't terminate, eventually the task was aborted.  I have already
 tried a couple of times to retrigger tye python precommit but it failed
 every time.

 @Tanay did it pass it for you?





 On Fri, Jul 12, 2019 at 2:22 PM Tanay Tummalapalli 
 wrote:

> Thank You Valentyn!
>
> I'll retest it.
> Hopefully, it's a transient issue.
>
> Regards,
> - Tanay Tummalapalli
>
> On Sat, Jul 13, 2019 at 2:39 AM Valentyn Tymofieiev <
> valen...@google.com> wrote:
>
>> No, we did not reduce the timeout recently. Looking at console logs,
>> nothing happened for an hour or so,
>>
>> *06:57:50 py27-cython: commands succeeded 06:57:50 congratulations :)
>> 06:57:50 *
>>
>> *06:57:50* >* Task :sdks:python:preCommitPy2**08:22:33* Build timed out 
>> (after 120 minutes). Marking the build as aborted.
>>
>>
>> However, we can also see in the logs that py36-cython suite never
>> started, not sure way. I assume gradle waited for this suite to finish.
>> Try "retest this please", hopefully this is a transient gradle issue.
>> I did not observe it before.
>>
>> On Fri, Jul 12, 2019 at 1:22 PM Tanay Tummalapalli <
>> ttanay...@gmail.com> wrote:
>>
>>> Hi Udi,
>>>
>>> I rebased another PR[1] onto the fix mentioned above. The lint error
>>> is fixed, but, the "beam_PreCommit_Python_Commit" Jenkins job is failing
>>> because of a timeout at 120 minutes[2].
>>> The log says "Build timed out (after 120 minutes). Marking the
>>> build as aborted."
>>> Another PR's Python PreCommit job aborted with the same error[3].
>>>
>>> I found this issue - "[BEAM-3040] Python precommit timed out after
>>> 150 minutes"[4].
>>> Was the timeout reduced recently?
>>>
>>> Regards,
>>> - Tanay Tummalapalli
>>>
>>> [1] https://github.com/apache/beam/pull/8871
>>> [2]
>>> https://builds.apache.org/job/beam_PreCommit_Python_Commit/7412/consoleFull
>>>
>>> [3] https://github.com/apache/beam/pull/9050
>>> [4] https://issues.apache.org/jira/browse/BEAM-3040
>>>
>>> On 

Re: Sort Merge Bucket - Action Items

2019-07-22 Thread Robert Bradshaw
On Fri, Jul 19, 2019 at 5:16 PM Neville Li  wrote:
>
> Forking this thread to discuss action items regarding the change. We can keep 
> technical discussion in the original thread.
>
> Background: our SMB POC showed promising performance & cost saving 
> improvements and we'd like to adopt it for production soon (by EOY). We want 
> to contribute it to Beam so it's better generalized and maintained. We also 
> want to avoid divergence between our internal version and the PR while it's 
> in progress, specifically any breaking change in the produced SMB data.

All good goals.

> To achieve that I'd like to propose a few action items.
>
> 1. Reach a consensus about bucket and shard strategy, key handling, bucket 
> file and metadata format, etc., anything that affect produced SMB data.
> 2. Revise the existing PR according to #1
> 3. Reduce duplicate file IO logic by reusing FileIO.Sink, Compression, etc., 
> but keep the existing file level abstraction
> 4. (Optional) Merge code into extensions::smb but mark clearly as 
> @experimental
> 5. Incorporate ideas from the discussion, e.g. ShardingFn, 
> GroupByKeyAndSortValues, FileIO generalization, key URN, etc.
>
> #1-4 gives us something usable in the short term, while #1 guarantees that 
> production data produced today are usable when #5 lands on master. #4 also 
> gives early adopters a chance to give feedback.
> Due to the scope of #5, it might take much longer and a couple of big PRs to 
> achieve, which we can keep iterating on.
>
> What are your thoughts on this?

I would like to see some resolution on the FileIO abstractions before
merging into experimental. (We have a FileBasedSink that would mostly
already work, so it's a matter of coming up with an analogous Source
interface.) Specifically I would not want to merge a set of per file
type smb IOs without a path forward to this or the determination that
it's not possible/desirable.


Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-18 Thread Robert Bradshaw
On Wed, Jul 17, 2019 at 9:12 PM Gleb Kanterov  wrote:
>>
>> Suppose one assigns a sharding function to a PCollection. Is it lazy,
>> or does it induce a reshuffle right at that point? In either case,
>> once the ShardingFn has been applied, how long does it remain in
>> effect? Does it prohibit the runner (or user) from doing subsequent
>> resharding (including dynamic load balancing)? What happens when one
>> has a DoFn that changes the value? (Including the DoFns in our sinks
>> that assign random keys.)
>
>
> What if we would reason about sharding in the same way as we reason about 
> timestamps?
>
> Please correct me if I am wrong, as I know, in Beam, timestamps exist for 
> each element. You can get timestamp by using Reify.timestamps. If there are 
> timestamped values, and they go through ParDo, timestamps are preserved.

That is correct.

> We can think of the same with sharding, where Reify.shards would be 
> PTransform, ShardedValue> and ShardedValue would contain 
> shard and a grouping key.

Meaning the shard that the PCollection is currently sharded by, or the
one that it should be sharded by in the future. (Your use case is a
bit strange in that a single key may be spread across multiple shards,
as long as they're part of the same "bucket.")

> ParDo wouldn't change sharding and would propagate ShardingFn.

The ShardingFn may not be applicable to downstream (mutated) elements.

FYI, internally this is handled by having annotations on DoFns as
being key-preserving, and only reasoning about operations separated by
such DoFns.

> CoGroupByKey on such PTransforms would reify grouping key, and do regular 
> CoGroupByKey, or be rewritten to a regular ParDo if sharding of inputs is 
> compatible.
>
> As you mentioned, it requires dynamic work rebalancing to preserve sharding. 
> What if we do dynamic work rebalancing for each shard independently, as, I 
> guess, it's done today for fixed windows.

Currently, the unit of colocation is by key. Generally sharding
introduces a notion of colocation where multiple keys (or mulitple
elements, I suppose it need not be keyed) are promised to be processed
by the same machine. This is both to constraining (wrt dynamic
reshrading) and not needed (with respect to SMB, as your "colocation"
is per bucket, but buckets themselves can be processed in a
distributed manner).

> When we do a split, we would split one shard into two. It should be possible 
> to do consistently if values within buckets are sorted, in this case, we 
> would split ranges of possible values.

I'm not quite following here. Suppose one processes element a, m, and
z. Then one decides to split the bundle, but there's not a "range" we
can pick for the "other" as this bundle already spans the whole range.
But maybe I'm just off in the weeds here.

> On Wed, Jul 17, 2019 at 6:37 PM Robert Bradshaw  wrote:
>>
>> On Wed, Jul 17, 2019 at 4:26 PM Gleb Kanterov  wrote:
>> >
>> > I find there is an interesting point in the comments brought by Ahmed 
>> > Eleryan. Similar to WindowFn, having a concept of ShardingFn, that enables 
>> > users to implement a class for sharding data. Each Beam node can have 
>> > ShardingFn set, similar to WindowFn (or WindowingStrategy). Sinks and 
>> > sources are aware of that and preserve this information. Using that it's 
>> > possible to do optimization on Beam graph, removing redundant 
>> > CoGroupByKey, and it would be transparent to users.
>> >
>> > It feels like a nice addition to the Beam model, or possibly we can 
>> > implement it using existing windowing mechanics. There are people on the 
>> > list with strong experience in the area, I'm wondering what do you think?
>>
>> I've actually thought about this some, though it's been quite a while.
>> At the time it seemed hard to work it into a cohesive part of the
>> model (even ignoring the fact that sharding is primarily an execution,
>> rather than logical, property).
>>
>> Suppose one assigns a sharding function to a PCollection. Is it lazy,
>> or does it induce a reshuffle right at that point? In either case,
>> once the ShardingFn has been applied, how long does it remain in
>> effect? Does it prohibit the runner (or user) from doing subsequent
>> resharding (including dynamic load balancing)? What happens when one
>> has a DoFn that changes the value? (Including the DoFns in our sinks
>> that assign random keys.)
>>
>> Right now one can get most of the semantics of sharding by keying by
>> the shard id and doing a GBK, where the resulting value set (which is
>> allowed to be arbitrarily big) is the (indivisible) shard (e.g. for

Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-17 Thread Robert Bradshaw
nd merge 
>>>> sort.
>>>> We open the files in a DoFn, and emit KV where the CGBKR 
>>>> encapsulates Iterable from each input.
>>>> Basically we need a simple API like ResourceId -> Iterator, i.e. 
>>>> sequential read, no block/offset/split requirement.
>>>> FileBasedSource.FileBasedReader seems the closest fit but they're nested & 
>>>> decoupled.
>>>> There's no FileIO.Read, only a ReadMatches[1], which can be used with 
>>>> ReadAllViaFileBasedSource. But that's not the granularity we need, 
>>>> since we lose ordering of the input records, and can't merge 2+ sources.
>>>>
>>>> Writer
>>>>
>>>> We get a `PCollection>` after bucket and and 
>>>> sort, where Iterable is the records sorted by key and BucketShardId is 
>>>> used to produce filename, e.g. bucket-1-shard-2.avro.
>>>> We write each Iterable to a temp file and move to final destination 
>>>> when done. Both should ideally reuse existing code.
>>>> Looks like FileIO.Sink (and impls in AvroIO, TextIO, TFRecordIO) supports 
>>>> record writing into a WritableByteChannel, but some logic like compression 
>>>> is handled in FileIO through ViaFileBasedSink which extends FileBasedSink.
>>>> FileIO uses WriteFiles[3] to shard and write of PCollection. Again we 
>>>> lose ordering of the output records or custom file naming scheme. However, 
>>>> WriteShardsIntoTempFilesFn[4] and FinalizeTempFileBundles[5] in WriteFiles 
>>>> seem closest to our need but would have to be split out and generalized.
>>>>
>>>> Note on reader block/offset/split requirement
>>>>
>>>> Because of the merge sort, we can't split or offset seek a bucket file. 
>>>> Because without persisting the offset index of a key group somewhere, we 
>>>> can't efficiently skip to a key group without exhausting the previous 
>>>> ones. Furthermore we need to merge sort and align keys from multiple 
>>>> sources, which may not have the same key distribution. It might be 
>>>> possible to binary search for matching keys but that's extra complication. 
>>>> IMO the reader work distribution is better solved by better bucket/shard 
>>>> strategy in upstream writer.
>>>>
>>>> References
>>>>
>>>> ReadMatches extends PTransform, 
>>>> PCollection>
>>>> ReadAllViaFileBasedSource extends PTransform, 
>>>> PCollection>
>>>> WriteFiles extends 
>>>> PTransform, WriteFilesResult>
>>>> WriteShardsIntoTempFilesFn extends DoFn, 
>>>> Iterable>, FileResult>
>>>> FinalizeTempFileBundles extends PTransform< 
>>>> PCollection>>, 
>>>> WriteFilesResult>
>>>>
>>>>
>>>> On Tue, Jul 16, 2019 at 5:15 AM Robert Bradshaw  
>>>> wrote:
>>>>>
>>>>> On Mon, Jul 15, 2019 at 7:03 PM Eugene Kirpichov  
>>>>> wrote:
>>>>> >
>>>>> > Quick note: I didn't look through the document, but please do not build 
>>>>> > on either FileBasedSink or FileBasedReader. They are both remnants of 
>>>>> > the old, non-composable IO world; and in fact much of the composable IO 
>>>>> > work emerged from frustration with their limitations and recognizing 
>>>>> > that many other IOs were suffering from the same limitations.
>>>>> > Instead of FileBasedSink, build on FileIO.write; instead of 
>>>>> > FileBasedReader, build on FileIO.read.
>>>>>
>>>>> +1
>>>>>
>>>>> I think the sink could be written atop FileIO.write, possibly using
>>>>> dynamic destinations. At the very least the FileSink interface, which
>>>>> handles the details of writing a single shard, would be an ideal way
>>>>> to parameterize an SMB sink. It seems that none of our existing IOs
>>>>> (publically?) expose FileSink implementations.
>>>>>
>>>>> FileIO.read is not flexible enough to do the merging. Eugene, is there
>>>>> a composable analogue to FileSink, for sources, i.e. something that
>>>>> can turn a file handle (possibly with offsets) into a set of records
>>>>> other than FileBasedReader?
>>>>>
>>>>> > On Mon, Jul 15, 2019 at 9:01 AM Gleb Kanterov  wrote:
>>>>> >

Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-17 Thread Robert Bradshaw
ause of the merge sort, we can't split or offset seek a bucket file. 
>>> Because without persisting the offset index of a key group somewhere, we 
>>> can't efficiently skip to a key group without exhausting the previous ones. 
>>> Furthermore we need to merge sort and align keys from multiple sources, 
>>> which may not have the same key distribution. It might be possible to 
>>> binary search for matching keys but that's extra complication. IMO the 
>>> reader work distribution is better solved by better bucket/shard strategy 
>>> in upstream writer.
>>>
>>> References
>>>
>>> ReadMatches extends PTransform, 
>>> PCollection>
>>> ReadAllViaFileBasedSource extends PTransform, 
>>> PCollection>
>>> WriteFiles extends 
>>> PTransform, WriteFilesResult>
>>> WriteShardsIntoTempFilesFn extends DoFn, 
>>> Iterable>, FileResult>
>>> FinalizeTempFileBundles extends PTransform< 
>>> PCollection>>, WriteFilesResult>
>>>
>>>
>>> On Tue, Jul 16, 2019 at 5:15 AM Robert Bradshaw  wrote:
>>>>
>>>> On Mon, Jul 15, 2019 at 7:03 PM Eugene Kirpichov  
>>>> wrote:
>>>> >
>>>> > Quick note: I didn't look through the document, but please do not build 
>>>> > on either FileBasedSink or FileBasedReader. They are both remnants of 
>>>> > the old, non-composable IO world; and in fact much of the composable IO 
>>>> > work emerged from frustration with their limitations and recognizing 
>>>> > that many other IOs were suffering from the same limitations.
>>>> > Instead of FileBasedSink, build on FileIO.write; instead of 
>>>> > FileBasedReader, build on FileIO.read.
>>>>
>>>> +1
>>>>
>>>> I think the sink could be written atop FileIO.write, possibly using
>>>> dynamic destinations. At the very least the FileSink interface, which
>>>> handles the details of writing a single shard, would be an ideal way
>>>> to parameterize an SMB sink. It seems that none of our existing IOs
>>>> (publically?) expose FileSink implementations.
>>>>
>>>> FileIO.read is not flexible enough to do the merging. Eugene, is there
>>>> a composable analogue to FileSink, for sources, i.e. something that
>>>> can turn a file handle (possibly with offsets) into a set of records
>>>> other than FileBasedReader?
>>>>
>>>> > On Mon, Jul 15, 2019 at 9:01 AM Gleb Kanterov  wrote:
>>>> >>
>>>> >> I share the same concern with Robert regarding re-implementing parts of 
>>>> >> IO. At the same time, in the past, I worked on internal libraries that 
>>>> >> try to re-use code from existing IO, and it's hardly possible because 
>>>> >> it feels like it wasn't designed for re-use. There are a lot of classes 
>>>> >> that are nested (non-static) or non-public. I can understand why they 
>>>> >> were made non-public, it's a hard abstraction to design well and keep 
>>>> >> compatibility. As Neville mentioned, decoupling readers and writers 
>>>> >> would not only benefit for this proposal but for any other use-case 
>>>> >> that has to deal with low-level API such as FileSystem API, that is 
>>>> >> hardly possible today without copy-pasting,
>>>> >>
>>>> >>
>>>> >>
>>>> >>
>>>> >>
>>>> >> On Mon, Jul 15, 2019 at 5:05 PM Neville Li  
>>>> >> wrote:
>>>> >>>
>>>> >>> Re: avoiding mirroring IO functionality, what about:
>>>> >>>
>>>> >>> - Decouple the nested FileBasedSink.Writer and 
>>>> >>> FileBasedSource.FileBasedReader, make them top level and remove 
>>>> >>> references to parent classes.
>>>> >>> - Simplify the interfaces, while maintaining support for block/offset 
>>>> >>> read & sequential write.
>>>> >>> - As a bonus, the refactored IO classes can be used standalone in case 
>>>> >>> when the user wants to perform custom IO in a DoFn, i.e. a 
>>>> >>> PTransform, PCollection>>. 
>>>> >>> Today this requires a lot of copy-pasted Avro boilerplate.
>>>> >>> - For compatibility, we can delegate to the new classes

Re: [DISCUSS] Contributor guidelines for iterating on PRs: when to squash commits.

2019-07-17 Thread Robert Bradshaw
Sounds good.

I think the high level bit is that whoever merges should *think* about
what they're putting in the history, even if it's just a pausing to
think "should I swash or merge this PR" rather than just clicking the
button.

On Wed, Jul 17, 2019 at 4:59 PM Valentyn Tymofieiev  wrote:
>
> Thanks everyone for the discussion and your thoughts.
>
> Here's my summary:
>
> We don't have to be too prescriptive about who does what and when if we keep 
> these goals in mind:
>
> 1. When a PR is being merged, each commit should clearly do something that it 
> states, and a commit should do just one thing.
> 2. While a PR is being reviewed, authors should refrain from squashing new 
> changes into previously reviewed commits.
>
> Committers who do the merge can help enforce #1, and every contributor can 
> help shape review culture to follow #2.
>
> For example, assume we have following PRs that were LGTMed:
>
> PR #1:
> a00 "[BEAM-1234] Add feature X"
> a01 "fixup: Address review  comments."
> a02 "fixup! Fix lint."
>
> A committer can Squash-and-merge PR #1, and clean up fixup messages from the 
> merge commit description. As far as I know, fixup: vs fixup! makes no 
> difference.
>
> PR #2
> a03 "[BEAM-1234] Add feature Y"
> a04 "[BEAM-1234] Add feature Z"
> a05 "fixup: Fix lint."
>
> A committer cannot squash-and-merge PR #1 using current github UI, since 
> features Y and Z will be combined into 1 commit, which violates item 1. 
> Therefore, a committer should ask the author to squash a05 once PR has 
> been LGTMed.
>
> Authors can squash their changes and do force pushes as long as they don't 
> squash changes into commits that were commented on, which renders reviewer's 
> comments "Outdated". Although outdated comments can still be found on PR 
> page, discussion thread is hard to follow and does comments threads do not 
> appear when reviewing the PR, e.g in "Files changed". Authors can still 
> squash new commits between review iterations, for example:
> 1. Author adds commit a00 "[BEAM-1234] Add feature X" and requests review.
> 2. Author adds a01 "fixup: Address review  comments." and pushes the 
> branch.
> 3. Author notices that lint tests fail and adds a02 "fixup! Fix lint."
> 4. Author can squash a02 into a01 if they wish, and force-push two 
> commits:
>
> a00 "[BEAM-1234] Add feature X"   - commit hash not changed after 
> squash & force-push
> b00 "fixup: Address review  comments."  - commit hash has changed after 
> squash, but these commits were never reviewed,
>
> 5. Author requests another review iteration (PTAL). Since PR still has 
> a00, previous review comments will not be outdated, and reviewer can 
> still see the difference since last review by inspecting b00.
>
> Thanks,
> Valentyn
>
> On Wed, Jul 10, 2019 at 3:26 AM Robert Bradshaw  wrote:
>>
>> On Wed, Jul 10, 2019 at 5:06 AM Kenneth Knowles  wrote:
>> >
>> > My opinion: what is important is that we have a policy for what goes into 
>> > the master commit history. This is very simple IMO: each commit should 
>> > clearly do something that it states, and a commit should do just one thing.
>>
>> Exactly how I feel as well.
>>
>> > Personally, I don't feel a need to set a rule for who does the squashing 
>> > (or non-squashing) or other actions necessary to maintain a clear history.
>>
>> I think it is on the person who does the merge to make sure the
>> history is clean.
>>
>> > In PRs I review the question of who should squash has never come up as an 
>> > issue. Most PRs are either a bunch of random commits obviously meant for 
>> > squash, or carefully managed commits with good messages using the 
>> > git-supported "fixup!" syntax or clear "fixup:" commit messages. It is a 
>> > polarizing issue, which is a good thing in this case as it makes it very 
>> > clear how to merge.
>>
>> This is my experience too.
>>
>> Unfortunately, GitHub only offers squash-everything vs. squash-nothing
>> via the UI.
>>
>> > Your original concern was authors force pushing during review making it 
>> > hard to review. For your point "3. After a force-push, comments made by 
>> > reviewers on earlier commit are hard to find." I thought GitHub had fixed 
>> > that. These comments used to vanish entirely, but now they are still on 
>> > the main PR page IIRC. If it is not fixed, it

Re: [ANNOUNCE] New committer: Robert Burke

2019-07-17 Thread Robert Bradshaw
Congratulations!

On Wed, Jul 17, 2019, 12:56 PM Katarzyna Kucharczyk 
wrote:

> Congratulations! :)
>
> On Wed, Jul 17, 2019 at 12:46 PM Michał Walenia <
> michal.wale...@polidea.com> wrote:
>
>> Congratulations, Robert! :)
>>
>> On Wed, Jul 17, 2019 at 12:45 PM Łukasz Gajowy 
>> wrote:
>>
>>> Congratulations! :)
>>>
>>> śr., 17 lip 2019 o 04:30 Rakesh Kumar  napisał(a):
>>>
 Congrats Rob!!!

 On Tue, Jul 16, 2019 at 10:24 AM Ahmet Altay  wrote:

> Hi,
>
> Please join me and the rest of the Beam PMC in welcoming a new
> committer: Robert Burke.
>
> Robert has been contributing to Beam and actively involved in the
> community for over a year. He has been actively working on Go SDK, helping
> users, and making it easier for others to contribute [1].
>
> In consideration of Robert's contributions, the Beam PMC trusts him
> with the responsibilities of a Beam committer [2].
>
> Thank you, Robert, for your contributions and looking forward to many
> more!
>
> Ahmet, on behalf of the Apache Beam PMC
>
> [1]
> https://lists.apache.org/thread.html/8f729da2d3009059d7a8b2d8624446be161700dcfa953939dd3530c6@%3Cdev.beam.apache.org%3E
> [2] https://beam.apache.org/contribute/become-a-committer
> /#an-apache-beam-committer
>

>>
>> --
>>
>> Michał Walenia
>> Polidea  | Software Engineer
>>
>> M: +48 791 432 002 <+48791432002>
>> E: michal.wale...@polidea.com
>>
>> Unique Tech
>> Check out our projects! 
>>
>


Re: Write-through-cache in State logic

2019-07-16 Thread Robert Bradshaw
Python workers also have a per-bundle SDK-side cache. A protocol has
been proposed, but hasn't yet been implemented in any SDKs or runners.

On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax  wrote:
>
> It's runner dependent. Some runners (e.g. the Dataflow runner) do have such a 
> cache, though I think it's currently has a cap for large bags.
>
> Reuven
>
> On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar  wrote:
>>
>> Hi,
>>
>> I have been using python sdk for the application and also using BagState in 
>> production. I was wondering whether state logic has any write-through-cache 
>> implemented or not. If we are sending every read and write request through 
>> network then it comes with a performance cost. We can avoid network call for 
>> a read operation if we have write-through-cache.
>> I have superficially looked into the implementation and I didn't see any 
>> cache implementation.
>>
>> is it possible to have this cache? would it cause any issue if we have the 
>> caching layer?
>>


Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-16 Thread Robert Bradshaw
On Mon, Jul 15, 2019 at 7:03 PM Eugene Kirpichov  wrote:
>
> Quick note: I didn't look through the document, but please do not build on 
> either FileBasedSink or FileBasedReader. They are both remnants of the old, 
> non-composable IO world; and in fact much of the composable IO work emerged 
> from frustration with their limitations and recognizing that many other IOs 
> were suffering from the same limitations.
> Instead of FileBasedSink, build on FileIO.write; instead of FileBasedReader, 
> build on FileIO.read.

+1

I think the sink could be written atop FileIO.write, possibly using
dynamic destinations. At the very least the FileSink interface, which
handles the details of writing a single shard, would be an ideal way
to parameterize an SMB sink. It seems that none of our existing IOs
(publically?) expose FileSink implementations.

FileIO.read is not flexible enough to do the merging. Eugene, is there
a composable analogue to FileSink, for sources, i.e. something that
can turn a file handle (possibly with offsets) into a set of records
other than FileBasedReader?

> On Mon, Jul 15, 2019 at 9:01 AM Gleb Kanterov  wrote:
>>
>> I share the same concern with Robert regarding re-implementing parts of IO. 
>> At the same time, in the past, I worked on internal libraries that try to 
>> re-use code from existing IO, and it's hardly possible because it feels like 
>> it wasn't designed for re-use. There are a lot of classes that are nested 
>> (non-static) or non-public. I can understand why they were made non-public, 
>> it's a hard abstraction to design well and keep compatibility. As Neville 
>> mentioned, decoupling readers and writers would not only benefit for this 
>> proposal but for any other use-case that has to deal with low-level API such 
>> as FileSystem API, that is hardly possible today without copy-pasting,
>>
>>
>>
>>
>>
>> On Mon, Jul 15, 2019 at 5:05 PM Neville Li  wrote:
>>>
>>> Re: avoiding mirroring IO functionality, what about:
>>>
>>> - Decouple the nested FileBasedSink.Writer and 
>>> FileBasedSource.FileBasedReader, make them top level and remove references 
>>> to parent classes.
>>> - Simplify the interfaces, while maintaining support for block/offset read 
>>> & sequential write.
>>> - As a bonus, the refactored IO classes can be used standalone in case when 
>>> the user wants to perform custom IO in a DoFn, i.e. a 
>>> PTransform, PCollection>>. Today 
>>> this requires a lot of copy-pasted Avro boilerplate.
>>> - For compatibility, we can delegate to the new classes from the old ones 
>>> and remove them in the next breaking release.
>>>
>>> Re: WriteFiles logic, I'm not sure about generalizing it, but what about 
>>> splitting the part handling writing temp files into a new 
>>> PTransform>>, 
>>> PCollection>>? That splits the bucket-shard 
>>> logic from actual file IO.
>>>
>>> On Mon, Jul 15, 2019 at 10:27 AM Robert Bradshaw  
>>> wrote:
>>>>
>>>> I agree that generalizing the existing FileIO may not be the right
>>>> path forward, and I'd only make their innards public with great care.
>>>> (Would this be used like like
>>>> SmbSink(MyFileIO.sink(parameters).getWriter[Factory]())?) SMB is a bit
>>>> unique that the source and sink are much more coupled than other
>>>> sources and sinks (which happen to be completely independent, if
>>>> complementary implementations, whereas SMB attempts to be a kind of
>>>> pipe where one half is instanciated in each pipeline).
>>>>
>>>> In short, an SMB source/sink that is parameterized by an arbitrary,
>>>> existing IO would be ideal (but possibly not feasible (per existing
>>>> prioritizations)), or an SMB source/sink that works as a pair. What
>>>> I'd like to avoid is a set of parallel SMB IO classes that (partially,
>>>> and incompletely) mirror the existing IO ones (from an API
>>>> perspective--how much implementation it makes sense to share is an
>>>> orthogonal issue that I'm sure can be worked out.)
>>>>
>>>> On Mon, Jul 15, 2019 at 4:18 PM Neville Li  wrote:
>>>> >
>>>> > Hi Robert,
>>>> >
>>>> > I agree, it'd be nice to reuse FileIO logic of different file types. But 
>>>> > given the current code structure of FileIO & scope of the change, I feel 
>>>> > it's better left for future refactor PRs.
>>>> >
>>>> > Some thoughts:
>

Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-15 Thread Robert Bradshaw
I agree that generalizing the existing FileIO may not be the right
path forward, and I'd only make their innards public with great care.
(Would this be used like like
SmbSink(MyFileIO.sink(parameters).getWriter[Factory]())?) SMB is a bit
unique that the source and sink are much more coupled than other
sources and sinks (which happen to be completely independent, if
complementary implementations, whereas SMB attempts to be a kind of
pipe where one half is instanciated in each pipeline).

In short, an SMB source/sink that is parameterized by an arbitrary,
existing IO would be ideal (but possibly not feasible (per existing
prioritizations)), or an SMB source/sink that works as a pair. What
I'd like to avoid is a set of parallel SMB IO classes that (partially,
and incompletely) mirror the existing IO ones (from an API
perspective--how much implementation it makes sense to share is an
orthogonal issue that I'm sure can be worked out.)

On Mon, Jul 15, 2019 at 4:18 PM Neville Li  wrote:
>
> Hi Robert,
>
> I agree, it'd be nice to reuse FileIO logic of different file types. But 
> given the current code structure of FileIO & scope of the change, I feel it's 
> better left for future refactor PRs.
>
> Some thoughts:
> - SMB file operation is simple single file sequential reads/writes, which 
> already exists as Writer & FileBasedReader but are private inner classes, and 
> have references to the parent Sink/Source instance.
> - The readers also have extra offset/split logic but that can be worked 
> around.
> - It'll be nice to not duplicate temp->destination file logic but again 
> WriteFiles is assuming a single integer shard key, so it'll take some 
> refactoring to reuse it.
>
> All of these can be done in backwards compatible way. OTOH generalizing the 
> existing components too much (esp. WriteFiles, which is already complex) 
> might lead to two logic paths, one specialized for the SMB case. It might be 
> easier to decouple some of them for better reuse. But again I feel it's a 
> separate discussion.
>
> On Mon, Jul 15, 2019 at 9:45 AM Claire McGinty  
> wrote:
>>
>> Thanks Robert!
>>
>> We'd definitely like to be able to re-use existing I/O components--for 
>> example the Writer/FileBasedReader (since they 
>> operate on a WritableByteChannel/ReadableByteChannel, which is the level of 
>> granularity we need) but the Writers, at least, seem to be mostly 
>> private-access. Do you foresee them being made public at any point?
>>
>> - Claire
>>
>> On Mon, Jul 15, 2019 at 9:31 AM Robert Bradshaw  wrote:
>>>
>>> I left some comments on the doc.
>>>
>>> I think the general idea is sound, but one thing that worries me is
>>> the introduction of a parallel set of IOs that mirrors the (existing)
>>> FileIOs. I would suggest either (1) incorporate this functionality
>>> into the generic FileIO infrastructure, or let it be parameterized by
>>> arbitrary IO (which I'm not sure is possible, especially for the Read
>>> side (and better would be the capability of supporting arbitrary
>>> sources, aka an optional "as-sharded-source" operation that returns a
>>> PTransform<..., KV>> where the iterable is
>>> promised to be in key order)) or support a single SMB aka
>>> "PreGrouping" source/sink pair that's aways used together (and whose
>>> underlying format is not necessarily public).
>>>
>>> On Sat, Jul 13, 2019 at 3:19 PM Neville Li  wrote:
>>> >
>>> > 4 people have commented but mostly clarifying details and not much on the 
>>> > overall design.
>>> >
>>> > It'd be great to have thumbs up/down on the design, specifically 
>>> > metadata, bucket & shard strategy, etc., since that affects backwards 
>>> > compatibility of output files.
>>> > Some breaking changes, e.g. dynamic # of shards, are out of scope for V1 
>>> > unless someone feels strongly about it. The current scope should cover 
>>> > all our use cases and leave room for optimization.
>>> >
>>> > Once green lighted we can start adopting internally, ironing out rough 
>>> > edges while iterating on the PRs in parallel.
>>> >
>>> > Most of the implementation is self-contained in the extensions:smb 
>>> > module, except making a few core classes/methods public for reuse. So 
>>> > despite the amount of work it's still fairly low risk to the code base. 
>>> > There're some proposed optimization & refactoring involving core (see 
>>> > appendix) but IMO they're better left for followup PRs.
>

Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-15 Thread Robert Bradshaw
; pre-grouping/sorting data and writing to bucket/shard output files, the 
>>>> consumer can sort/merge matching ones without a CoGBK. Essentially we're 
>>>> paying the shuffle cost upfront to avoid them repeatedly in each consumer 
>>>> pipeline that wants to join data.
>>>>
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>>
>>>>> On Thu, Jun 27, 2019 at 8:12 AM Neville Li  wrote:
>>>>>>
>>>>>> Ping again. Any chance someone takes a look to get this thing going? 
>>>>>> It's just a design doc and basic metadata/IO impl. We're not talking 
>>>>>> about actual source/sink code yet (already done but saved for future 
>>>>>> PRs).
>>>>>>
>>>>>> On Fri, Jun 21, 2019 at 1:38 PM Ahmet Altay  wrote:
>>>>>>>
>>>>>>> Thank you Claire, this looks promising. Explicitly adding a few folks 
>>>>>>> that might have feedback: +Ismaël Mejía +Robert Bradshaw +Lukasz Cwik 
>>>>>>> +Chamikara Jayalath
>>>>>>>
>>>>>>> On Mon, Jun 17, 2019 at 2:12 PM Claire McGinty 
>>>>>>>  wrote:
>>>>>>>>
>>>>>>>> Hey dev@!
>>>>>>>>
>>>>>>>> Myself and a few other Spotify data engineers have put together a 
>>>>>>>> design doc for SMB Join support in Beam, and have a working Java 
>>>>>>>> implementation we've started to put up for PR ([0], [1], [2]). There's 
>>>>>>>> more detailed information in the document, but the tl;dr is that SMB 
>>>>>>>> is a strategy to optimize joins for file-based sources by modifying 
>>>>>>>> the initial write operation to write records in sorted buckets based 
>>>>>>>> on the desired join key. This means that subsequent joins of datasets 
>>>>>>>> written in this way are only sequential file reads, no shuffling 
>>>>>>>> involved. We've seen some pretty substantial performance speedups with 
>>>>>>>> our implementation and would love to get it checked in to Beam's Java 
>>>>>>>> SDK.
>>>>>>>>
>>>>>>>> We'd appreciate any suggestions or feedback on our proposal--the 
>>>>>>>> design doc should be public to comment on.
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>> Claire / Neville


Re: [python] ReadFromPubSub broken in Flink

2019-07-15 Thread Robert Bradshaw
On Mon, Jul 15, 2019 at 5:42 AM Chamikara Jayalath  wrote:
>
> On Sat, Jul 13, 2019 at 7:41 PM Chad Dombrova  wrote:
>>
>> Hi Chamikara,

 why not make this part of the pipeline options?  does it really need to 
 vary from transform to transform?
>>>
>>> It's possible for the same pipeline to connect to multiple expansion 
>>> services, to use transforms from more than one SDK language and/or version.
>>
>> There are only 3 languages supported, so excluding the use’s chosen language 
>> we’re only talking about 2 options (i.e. for python, they’re java and go). 
>> The reality is that Java provides the superset of all the IO functionality 
>> of Go and Python, and the addition of external transforms is only going to 
>> discourage the addition of more native IO transforms in python and go (which 
>> is ultimately a good thing!). So it seems like a poor UX choice to make 
>> users provide the expansion service to every single external IO transform 
>> when the reality is that 99.9% of the time it’ll be the same url for any 
>> given pipeline. Correct me if I’m wrong, but in a production scenario the 
>> expansion service not be the current default, localhost:8097, correct? That 
>> means users would need to always specific this arg.
>>
>> Here’s an alternate proposal: instead of providing the expansion service as 
>> a URL in a transform’s __init__ arg, i.e. 
>> expansion_service='localhost:8097', make it a symbolic name, like 
>> expansion_service='java' (an external transform is bound to a particular 
>> source SDK, e.g. KafkaIO is bound to Java, so this default seems reasonable 
>> to me). Then provide a pipeline option to specify the url of an expansion 
>> service alias in the form alias@url (e.g. 
>> --expansion_service=java@myexpansionservice:8097).

I like the idea of providing symbolic references, though there's a
question of resolution. Something like "java" would likely be
insufficient, as that would require the "java" service to build in
every IO and library (which may not even be possible, due to diamond
dependency problems, even if it were desirable (could be very
wasteful), but a reasonable subset could have big wins). The lifetime
of services is another question that is as-yet unresolved, and as you
point out this is very much a UX question as well.

Really, what I think is missing is the notion of "service (aka
cross-language library) as a release artifact." This will consist of a
global, versioned identification of the code (and supporting data)
itself (e.g. a jar or docker image) plus some parameters for how to
start it up (hopefully minimal if we have some good conventions).
We're tackling the same issue with workers (environment
specifications), runners (I've been working on
https://github.com/apache/beam/pull/9043) and now expansion services.

This is starting to sound more like a topic of discussion for the dev
list. Added.

> Right, I was pointing out that this is a per-transform option, not a 
> per-pipeline option. Also, exact interface of transform __init__ is upto 
> transform authors so I don't think Beam SDK can/should dictate that all 
> external transforms use this notation. Also this will require all external 
> transforms implementations to internally use a pre-specified pipeline option 
> when construction parameters (for example, "expansion_service") have certain 
> values (for example, "java"). This again cannot be enforced.
>>>
>>> Are you talking about key/value coders of the Kafka external transform ?
>>> Story of coders is bit complicated for cross-language transform. Even if we 
>>> get a bytestring from Java, how can we make sure that that is processable 
>>> in Python ? For example, it might be a serialized Java object.
>>
>> IIUC, it’s not as if you support that with the current design, do you? If 
>> it’s a Java object that your native IO transform decodes in Java, then how 
>> are you going to get that to Python? Presumably the reason it’s encoded as a 
>> Java object is because it can’t be represented using a cross-language coder.
>
>
> This was my point.  There's no easy way to map bytes produced by an external 
> transform to a Python coder. So we have to go through standard coders (or 
> common formats) in SDK boundaries till we have portable schema support.
>
>> On the other hand, if I’m authoring a beam pipeline in python using an 
>> external transform like PubSubIO, then it’s desirable for me to write a 
>> pickled python object to WriteToPubSub and get that back in a ReadFromPubSub 
>> in another python-based pipeline. In other words, when it comes to coders, 
>> it seems we should be favoring the language that is using the external 
>> transform, rather than the native language of the transform itself.
>
> I see, note that bytes is one of the standard coders. So if you want to 
> generate bytes to be written to PubSub in Python SDK and write those bytes to 
> PubSub using a cross-language PubSub sink transform that will work.
>
>>
>> All 

Re: pickling typing types in Python 3.5+

2019-07-10 Thread Robert Bradshaw
I looked into CloudPickle a while back, and would be supportive of the change.

On Mon, Jul 1, 2019 at 11:06 PM Valentyn Tymofieiev  wrote:
>
> I have checked that cloudpickle (an alternative to dill) is able to pickle 
> and unpickle typing types on Python 3.5, 3.6, which seems to be a recent 
> change, see: 
> https://github.com/cloudpipe/cloudpickle/issues/63#issuecomment-501624383.
>
> I am evaluating cloudpickle as a potential avenue to address several other 
> issues we found in Beam while working on Python 3 support, such as:
>
> https://issues.apache.org/jira/browse/BEAM-6522
> https://issues.apache.org/jira/browse/BEAM-7284
> https://issues.apache.org/jira/browse/BEAM-5878?focusedCommentId=16834554
> https://github.com/uqfoundation/dill/issues/300
> https://issues.apache.org/jira/browse/BEAM-7540
>
> Once I have more information on cloudpickle vs dill in Beam, I'll bring it to 
> the mailing list.
>
> On Wed, May 15, 2019 at 5:25 AM Robert Bradshaw  wrote:
>>
>> (2) seems reasonable.
>>
>> On Tue, May 14, 2019 at 3:15 AM Udi Meiri  wrote:
>> >
>> > It seems like pickling of typing types is broken in 3.5 and 3.6, fixed in 
>> > 3.7:
>> > https://github.com/python/typing/issues/511
>> >
>> > Here are my attempts:
>> > https://gist.github.com/udim/ec213305ca865390c391001e8778e91d
>> >
>> >
>> > My ideas:
>> > 1. I know that we override type object handling in pickler.py 
>> > (_nested_type_wrapper), and perhaps this mechanism can be used to pickle 
>> > typing classes correctly. The question is how.
>> >
>> > 2. Exclude/stub out these classes when pickling a pipeline - they are only 
>> > used for verification during pipeline construction anyway. This could be a 
>> > temporary solution for versions 3.5 and 3.6.
>> >
>> > Any ideas / opinions?


Re: [DISCUSS] Contributor guidelines for iterating on PRs: when to squash commits.

2019-07-10 Thread Robert Bradshaw
On Wed, Jul 10, 2019 at 5:06 AM Kenneth Knowles  wrote:
>
> My opinion: what is important is that we have a policy for what goes into the 
> master commit history. This is very simple IMO: each commit should clearly do 
> something that it states, and a commit should do just one thing.

Exactly how I feel as well.

> Personally, I don't feel a need to set a rule for who does the squashing (or 
> non-squashing) or other actions necessary to maintain a clear history.

I think it is on the person who does the merge to make sure the
history is clean.

> In PRs I review the question of who should squash has never come up as an 
> issue. Most PRs are either a bunch of random commits obviously meant for 
> squash, or carefully managed commits with good messages using the 
> git-supported "fixup!" syntax or clear "fixup:" commit messages. It is a 
> polarizing issue, which is a good thing in this case as it makes it very 
> clear how to merge.

This is my experience too.

Unfortunately, GitHub only offers squash-everything vs. squash-nothing
via the UI.

> Your original concern was authors force pushing during review making it hard 
> to review. For your point "3. After a force-push, comments made by reviewers 
> on earlier commit are hard to find." I thought GitHub had fixed that. These 
> comments used to vanish entirely, but now they are still on the main PR page 
> IIRC. If it is not fixed, it would make sense to add this to the contribution 
> guide, and even to the PR template.

I find it harder to review when authors force-push as well. When
commits are separate, the reviewer can choose what subset of changes
(including all of them, or just since the last review) to inspect, but
when they're squashed this ability is lost (and comments, though now
not dropped, don't tie back to code). I would be in favor of a
recommendation to not force-pushing *reviewed* commits during review.

I think this also requires committers to make a conscious effort when
merging (which is not being consistently done now). Something like a
simple github hook that advises (based on the commit messages) which
would be best could go a long way here.


> On Tue, Jul 9, 2019 at 2:18 PM Valentyn Tymofieiev  
> wrote:
>>
>> Ok, I think if authors mark fixup commits with "fixup" prefix and committers 
>> routinely fixup commits before the merge without asking the contributors to 
>> do so, the authors should not have a particular reason to fixup/squash + 
>> force-push all changes into one commit after addressing review comments. 
>> This will make the review easier, however committers will have to take 
>> responsibility for merging fixup commits.
>>
>> Currently both committer guide[1] and contributor guide[2] assume that it is 
>> the author's responsibility to merge fixup commit.
>>
>>> The reviewer should give the LGTM and then request that the author of the 
>>> pull request rebase, squash, split, etc, the commit
>>
>>
>>> "After review is complete and the PR accepted, multiple commits should be 
>>> squashed (see Git workflow tips)".
>>
>>
>> Should we explicitly make squashing review-related commits a responsibility 
>> of committers?
>>
>> [1] https://beam.apache.org/contribute/committer-guide
>> [2] https://beam.apache.org/contribute/
>>
>>
>> On Tue, Jul 9, 2019 at 12:22 PM Rui Wang  wrote:
>>>
>>> "allow maintainers to edit" by default is enabled. Then the proposed 
>>> workflow looks reasonable to me now.
>>>
>>>
>>> -Rui
>>>
>>> On Tue, Jul 9, 2019 at 11:26 AM Kenneth Knowles  wrote:

 If you "allow maintainers to edit" the PR, it is easy for any committer to 
 fix up the commits and merge. They should not have to ask you to do it, 
 unless it is not obvious what to do.

 Kenn

 On Tue, Jul 9, 2019 at 11:05 AM Rui Wang  wrote:
>
> At least for me, because I usually don't know when PR review is done, in 
> order to make PR to be merged into Beam repo faster, I keep squashing 
> commits every time so that committers can review and then merge at a 
> time, otherwise committers could approve a PR but then ask squashing 
> commits, which leads to another ping and wait round.
>
> Thus I prefer committers do squash and merge, which will reduce PR 
> authors' load during PR review process.
>
>
> -Rui
>
>
> On Mon, Jul 8, 2019 at 5:44 PM Valentyn Tymofieiev  
> wrote:
>>
>> Rui, committer guide[1] does say that all commits are standalone changes:
>>
>>> We prefer small independent, incremental PRs with descriptive, isolated 
>>> commits. Each commit is a single clear change.
>>
>>
>> However in my opinion, this recommendation applies to moments when a PR 
>> is first sent for review, and when a PR is being merged. Committer guide 
>> also mentions that during review iterations authors may add 
>> review-related commits.
>>
>>> the pull request may have a collection of review-related 

Re: Accumulating mode implies that panes are processed in order?

2019-06-26 Thread Robert Bradshaw
On Thu, Jun 27, 2019 at 1:52 AM Rui Wang  wrote:
>>
>>
>>  AFAIK all streaming runners today practically do provide these panes in 
>> order;
>
> Does it refer to "the stage immediately after GBK itself processes fired 
> panes in order" in streaming runners? Could you share more information?
>
>
>>
>> this means that likely many users implicitly rely on this "non guarantee," 
>> probably without even knowing they are relying on it.
>
> If streaming runners have already provided or processed panes in order, and 
> likely many users rely on it already, why not make order of panes a part of 
> model explicitly?

Most runners produce panes in order, but they don't necessarily
preserve the order downstream (at least beyond what's fused into the
same stage, which is where it gets difficult).


<    2   3   4   5   6   7   8   9   10   11   >