Re: Flink 1.6 Support

2018-10-30 Thread Jins George
Thank you Thomas.  Idea of providing different build targets  for runners is 
great, as it enables users to pick from a list of runner versions.

Thanks

Jins George

On 10/30/18 12:36 PM, Thomas Weise wrote:
There has not been any decision to move to 1.6.x for the next release yet.

There has been related general discussion about upgrading runners recently [1]

Overall we need to consider the support for newer Flink versions that users 
find (the Flink version in distributions and what users typically have in their 
deployment stacks). These upgrades are not automatic/cheap/fast, so there is a 
balance to strike.

The good news is that with Beam 2.8.0 you should be able to make a build for 
1.6.x with just a version number change [2]  (Other compile differences have 
been cleaned up.)

[1] 
https://lists.apache.org/thread.html/0588ed783767991aa36b00b8529bbd29b3a8958ee6e82fca83ac2938@%3Cdev.beam.apache.org%3E
[2] https://github.com/apache/beam/blob/v2.8.0/runners/flink/build.gradle#L49


On Tue, Oct 30, 2018 at 10:50 AM Lukasz Cwik 
mailto:lc...@google.com>> wrote:
+dev

On Tue, Oct 30, 2018 at 10:30 AM Jins George 
mailto:jins.geo...@aeris.net>> wrote:
Hi Community,

Noticed that the Beam 2.8 release comes with flink  1.5.x dependency.
Are there any plans to upgrade flink to  1.6.x  in next beam release. (
I am looking for the better k8s  support in Flink 1.6)

Thanks,

Jins George



Re: Follow up ideas, to simplify creating MonitoringInfos.

2018-10-30 Thread Alex Amato
I am not sure on the correct syntax to populate the instances of my
MonitoringInfoSpec messages

message MonitoringInfoSpec {

string urn = 1;

string type_urn = 2;

repeated string required_labels = 3;

* map annotations = 4;*

}


Notice how the annotations field is not used anywhere. I was unable to get
this to compile and could find no examples of this on the proto github.
Perhaps I'll have to reach out to them. I was wondering if anyone here was
familiar first.


message MonitoringInfoSpecs {

enum MonitoringInfoSpecsEnum {

  USER_COUNTER = 0 [(monitoring_info_spec) = {

urn: "beam:metric:user",

type_urn: "beam:metrics:sum_int_64",

  }];


  ELEMENT_COUNT = 1 [(monitoring_info_spec) = {

urn: "beam:metric:element_count:v1",

type_urn: "beam:metrics:sum_int_64",

required_labels: ["PTRANSFORM"],

  }];


  START_BUNDLE_MSECS = 2 [(monitoring_info_spec) = {

urn: "beam:metric:pardo_execution_time:start_bundle_msecs:v1",

type_urn: "beam:metrics:sum_int_64",

required_labels: ["PTRANSFORM"],

  }];


  PROCESS_BUNDLE_MSECS = 3 [(monitoring_info_spec) = {

urn: "beam:metric:pardo_execution_time:process_bundle_msecs:v1",

type_urn: "beam:metrics:sum_int_64",

required_labels: ["PTRANSFORM"],

  }];


  FINISH_BUNDLE_MSECS = 4 [(monitoring_info_spec) = {

urn: "beam:metric:pardo_execution_time:finish_bundle_msecs:v1",

type_urn: "beam:metrics:sum_int_64",

required_labels: ["PTRANSFORM"],

  }];


  TOTAL_MSECS = 5 [(monitoring_info_spec) = {

urn: "beam:metric:ptransform_execution_time:total_msecs:v1",

type_urn: "beam:metrics:sum_int_64",

required_labels: ["PTRANSFORM"],

  }];

}

}




On Tue, Oct 30, 2018 at 2:01 PM Lukasz Cwik  wrote:

> I'm not sure what you mean by "Using a map in an option."
>
> For your second issue, the google docs around this show[1]:
>
> Note that if you want to use a custom option in a package other than the
> one in which it was defined, you must prefix the option name with the
> package name, just as you would for type names. For example:
>
> // foo.proto
> import "google/protobuf/descriptor.proto";
> package foo;
> extend google.protobuf.MessageOptions {
>   optional string my_option = 51234;
> }
>
> // bar.proto
> import "foo.proto";
> package bar;
> message MyMessage {
>   option (foo.my_option) = "Hello world!";
> }
>
>
> 1: https://developers.google.com/protocol-buffers/docs/proto#customoptions
>
>
> On Mon, Oct 29, 2018 at 5:19 PM Alex Amato  wrote:
>
>> Hi Robert and community, :)
>>
>> I was starting to code this up, but I wasn't sure exactly how to do some
>> of the proto syntax. Would you mind taking a look at this doc
>> 
>> and let me know if you know how to resolve any of these issues:
>>
>>- Using a map in an option.
>>- Referring to string "constants" from other enum options in .proto
>>files.
>>
>> Please see the comments I have listed in the doc
>> ,
>> and a few alternative suggestions.
>> Thanks
>>
>> On Wed, Oct 24, 2018 at 10:08 AM Alex Amato  wrote:
>>
>>> Okay. That makes sense. Using runtime validation and protos is what I
>>> was thinking as well.
>>> I'll include you as a reviewer in my PRs.
>>>
>>> As for the choice of a builder/constructor/factory. If we go with
>>> factory methods/constructor then we will need a method for each metric type
>>> (intCounter, latestInt64, ...). Plus, then I don't think we can have any
>>> constructor parameters for labels, we will just need to accept a dictionary
>>> for label keys+values like you say. This is because we cannot offer a
>>> method for each URN and its combination of labels, and we should avoid such
>>> static detection, as you say.
>>>
>>> The builder however, allows us to add a method for setting each label.
>>> Since there are a small number of labels I think this should be fine. A
>>> specific metric URN will have a specific set of labels which we can
>>> validate being set. Any variant of this should use a different label (or a
>>> new version in the label). Thus, the builder can give an advantage, making
>>> it easier to set label values without needing to provide the correct key
>>> for the label, when its set. You just need to call the correct method.
>>>
>>> Perhaps it might be best to leave this open to each SDK based on its
>>> language style (Builder, Factory, etc.) , but make sure we use the
>>> protos+runtime validation.
>>>
>>> On Wed, Oct 24, 2018 at 7:02 AM Robert Bradshaw 
>>> wrote:
>>>
 Thanks for bringing this to the list; it's a good question.

 I think the difficulty comes from trying to statically define a lists
 of possibilities that should instead be runtime values. E.g. we
 currently we're up to about a dozen distinct types, and having a
 setter for each is both verbose and 

Re: error with DirectRunner

2018-10-30 Thread Udi Meiri
+Robert Bradshaw  I would be happy to debug and fix
this, but I'd need more guidance on where to look.

On Tue, Oct 30, 2018 at 4:07 PM Udi Meiri  wrote:

> Created https://issues.apache.org/jira/browse/BEAM-5927
>
> On Tue, Oct 30, 2018 at 1:13 PM Lukasz Cwik  wrote:
>
>> Udi, do you know if we have a bug tracking this issue?
>>
>> If not, can you file one referencing this e-mail thread?
>>
>> On Tue, Oct 30, 2018 at 6:33 AM Allie Chen  wrote:
>>
>>> Thanks Udi. I agree, since it works fine removing either the side input
>>> or the last flatten and combine operation.
>>>
>>> On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri  wrote:
>>>
 This looks like a FnApiRunner bug.
 When I override use_fnapi_runner = False in direct_runner.py the
 pipeline works.

 It seems like either the side-input to _copy_number or the Flatten
 operation is the culprit.

 On Mon, Oct 29, 2018 at 2:37 PM Allie Chen 
 wrote:

> Hi,
>
> I have a project that started failing with DirectRunner, but works
> well using DataflowRunner (last working version is 2.4). The error message
> I received are:
> line 1088, in run_stage
>   pipeline_components.pcollections[actual_pcoll_id].coder_id]]
> KeyError: u'ref_Coder_WindowedValueCoder_1'
>
> I have simplified the pipeline to the following example. Can someone
> please take a look? Many thanks!
>
> Allie
>
>
> import apache_beam as beam
> import argparse
> from apache_beam import transforms
> from apache_beam import pvalue
> from apache_beam.options import pipeline_options
>
>
> def _copy_number(number, side=None):
>   yield number
>
>
> def fn_sum(values):
>   return sum(values)
>
>
> def run(argv=None):
>   parser = argparse.ArgumentParser()
>   _, pipeline_args = parser.parse_known_args(argv)
>   options = pipeline_options.PipelineOptions(pipeline_args)
>   numbers = [1, 2]
>   with beam.Pipeline(options=options) as p:
> sum_1 = (p
>  | 'ReadNumber1' >> transforms.Create(numbers)
>  | 'CalculateSum1' >> beam.CombineGlobally(fn_sum))
>
> sum_2 = (p
>  | 'ReadNumber2' >> transforms.Create(numbers)
>  | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1))
>  | 'CalculateSum2' >> beam.CombineGlobally(fn_sum))
>
> _ = ((sum_1, sum_2)
>  | beam.Flatten()
>  | 'CalculateSum3' >> beam.CombineGlobally(fn_sum)
>  | beam.io.WriteToText('gs://BUCKET/sum'))
>
>
>
>


smime.p7s
Description: S/MIME Cryptographic Signature


Re: error with DirectRunner

2018-10-30 Thread Udi Meiri
Created https://issues.apache.org/jira/browse/BEAM-5927

On Tue, Oct 30, 2018 at 1:13 PM Lukasz Cwik  wrote:

> Udi, do you know if we have a bug tracking this issue?
>
> If not, can you file one referencing this e-mail thread?
>
> On Tue, Oct 30, 2018 at 6:33 AM Allie Chen  wrote:
>
>> Thanks Udi. I agree, since it works fine removing either the side input
>> or the last flatten and combine operation.
>>
>> On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri  wrote:
>>
>>> This looks like a FnApiRunner bug.
>>> When I override use_fnapi_runner = False in direct_runner.py the
>>> pipeline works.
>>>
>>> It seems like either the side-input to _copy_number or the Flatten
>>> operation is the culprit.
>>>
>>> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen 
>>> wrote:
>>>
 Hi,

 I have a project that started failing with DirectRunner, but works well
 using DataflowRunner (last working version is 2.4). The error message I
 received are:
 line 1088, in run_stage
   pipeline_components.pcollections[actual_pcoll_id].coder_id]]
 KeyError: u'ref_Coder_WindowedValueCoder_1'

 I have simplified the pipeline to the following example. Can someone
 please take a look? Many thanks!

 Allie


 import apache_beam as beam
 import argparse
 from apache_beam import transforms
 from apache_beam import pvalue
 from apache_beam.options import pipeline_options


 def _copy_number(number, side=None):
   yield number


 def fn_sum(values):
   return sum(values)


 def run(argv=None):
   parser = argparse.ArgumentParser()
   _, pipeline_args = parser.parse_known_args(argv)
   options = pipeline_options.PipelineOptions(pipeline_args)
   numbers = [1, 2]
   with beam.Pipeline(options=options) as p:
 sum_1 = (p
  | 'ReadNumber1' >> transforms.Create(numbers)
  | 'CalculateSum1' >> beam.CombineGlobally(fn_sum))

 sum_2 = (p
  | 'ReadNumber2' >> transforms.Create(numbers)
  | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1))
  | 'CalculateSum2' >> beam.CombineGlobally(fn_sum))

 _ = ((sum_1, sum_2)
  | beam.Flatten()
  | 'CalculateSum3' >> beam.CombineGlobally(fn_sum)
  | beam.io.WriteToText('gs://BUCKET/sum'))






smime.p7s
Description: S/MIME Cryptographic Signature


Re: [PROPOSAL] ParquetIO support for Python SDK

2018-10-30 Thread Heejong Lee
Thanks all for the valuable feedback on the document. Here's the summary of
planned features for ParquetIO Python SDK:

   -

   Can read from Parquet file on any storage system supported by Beam
   -

   Can write to Parquet file on any storage system supported by Beam
   -

   Can configure the compression algorithm of output files
   -

   Can adjust the size of the row group
   -

   Can read multiple row groups in a single file parallelly (source
   splitting)
   -

   Can partially read by columns


It introduces new dependency pyarrow for parquet reading and writing
operations.

If you're interested, you can review and test the PR
https://github.com/apache/beam/pull/6763

Thanks,

On Wed, Oct 24, 2018 at 5:37 PM Chamikara Jayalath 
wrote:

> Thanks Heejong. Added some comments. +1 for summarizing the doc in the
> email thread.
>
> - Cham
>
> On Wed, Oct 24, 2018 at 4:45 PM Ahmet Altay  wrote:
>
>> Thank you Heejong. Could you also share a summary of the design document
>> (major points/decisions) in the mailing list?
>>
>> On Wed, Oct 24, 2018 at 4:08 PM, Heejong Lee  wrote:
>>
>>> Hi,
>>>
>>> I'm working on BEAM-: Parquet IO for Python SDK.
>>>
>>> Issue: https://issues.apache.org/jira/browse/BEAM-
>>> Design doc:
>>> https://docs.google.com/document/d/1-FT6zmjYhYFWXL8aDM5mNeiUnZdKnnB021zTo4S-0Wg
>>> WIP PR: https://github.com/apache/beam/pull/6763
>>>
>>> Any feedback is appreciated. Thanks!
>>>
>>>
>>


Re: [FYI] Jenkins is restarting, please do not merge PRs without validation

2018-10-30 Thread Rui Wang
Thanks for this heads up!

-Rui

On Tue, Oct 30, 2018 at 1:50 PM Scott Wegner  wrote:

>
> https://lists.apache.org/thread.html/d8704933befb71b5753dae8eaab7372f00a72307d66af86c120d79b8@%3Cbuilds.apache.org%3E
>
> INFRA is currently restarting Jenkins. While it is down, precommit checks
> will not run on GitHub PR's. Please do not merge PR's without validation.
>
> --
>
>
>
>
> Got feedback? tinyurl.com/swegner-feedback
>


Re: [PROPOSAL] Additional design for the Beam Python State and Timers API

2018-10-30 Thread Kenneth Knowles
Yea, I would expect A.
B would be ill-defined for processing time timers, and trouble for event
time timers once we decouple firing time and effective timestamp.
C could easily be very confusing; generally automatic window assignment
outside the Window transform is weird. The timestamp has to be < end of
window anyhow to be a well-formed timer.

Kenn

On Tue, Oct 30, 2018 at 1:40 PM Lukasz Cwik  wrote:

> My concerns are around item 4 (left the same comments in the doc).
>
> What window should timers be using when looking up a side input?
>
> A) The window corresponding to the element that set the original timer.
> B) The window that would have been assigned based upon when the timer is
> scheduled to fire.
> C) The window that would have been assigned from the "output" watermark
> hold time
>
> A makes the most sense to me since it represents the side input that the
> original element would have accessed. This allows people to schedule a
> timer to "wait" for a side input refresh. It also handles the side input
> push back issue.
>
> I'm not sure if B or C would allow different useful user scenarios that
> you would not be able to capture otherwise.
>
> How does any of these strategies impact the side input garbage collection?
>
> On Fri, Oct 26, 2018 at 9:47 AM Kenneth Knowles  wrote:
>
>> It all sounds very useful but I have basic concerns about item 1. The doc
>> doesn't really seem to go into the design concerns that I have in mind.
>>
>>  - map / flatMap are universal functions with definitions that we don't
>> own and shouldn't violate
>>  - corollary: map / flatMap have per element parallelism with no
>> dependencies between them
>>  - the doc says "map is just implemented as DoFn.process()" which is
>> implementation, not the spec
>>
>> So suggestions:
>>
>>  - how about just give it a new name making it clear it is not map nor
>> flatMap and does not have per-element parallelism
>>  - spec out the functionality without reference to DoFn
>>  - be explicit about what determines the maximum parallelism / which
>> elements are required to be processed serially (generally key+window)
>>
>> Kenn
>>
>> On Fri, Oct 26, 2018 at 2:49 AM Robert Bradshaw 
>> wrote:
>>
>>> Thanks. They make sense to me (and would have been handy when I was
>>> writing the state tests for the Fn API).
>>> On Fri, Oct 26, 2018 at 10:48 AM Charles Chen  wrote:
>>> >
>>> > Hey there,
>>> >
>>> > A while back, I shared the Beam Python State and Timers API proposal (
>>> https://s.apache.org/beam-python-user-state-and-timers) with this list
>>> [1]; we reached consensus on the features proposed there and I implemented
>>> the API surface described there, along with the reference DirectRunner
>>> implementation and some shared code for executing such pipelines (see for
>>> example https://github.com/apache/beam/pull/5691 and
>>> https://github.com/apache/beam/pull/6304).
>>> >
>>> > I would like to propose some additional design considerations to
>>> improve upon the previous State / Timer API proposal as described here (on
>>> pages 14-18 that I have appended to the doc):
>>> https://docs.google.com/document/d/1GadEkAmtbJQjmqiqfSzGw3b66TKerm8tyn6TK4blAys/edit#heading=h.10nb33sz7u16
>>> >
>>> > Briefly, these are the additional design considerations proposed to
>>> improve upon the API:
>>> >
>>> > Allowing access to user state / timers in Map / FlatMap callables /
>>> lambdas
>>> > Allowing access to the key during the timer callback
>>> > Allowing access to auxiliary timer data
>>> > Allowing access to side inputs during the timer callback
>>> >
>>> > I would really appreciate any feedback you may have.  Thanks!
>>> >
>>> > Best,
>>> > Charles
>>> >
>>> >
>>> > [1] Previous dev@ discussion thread:
>>> https://lists.apache.org/thread.html/51ba1a00027ad8635bc1d2c0df805ce873995170c75d6a08dfe21997@%3Cdev.beam.apache.org%3E
>>>
>>


Re: Follow up ideas, to simplify creating MonitoringInfos.

2018-10-30 Thread Lukasz Cwik
I'm not sure what you mean by "Using a map in an option."

For your second issue, the google docs around this show[1]:

Note that if you want to use a custom option in a package other than the
one in which it was defined, you must prefix the option name with the
package name, just as you would for type names. For example:

// foo.proto
import "google/protobuf/descriptor.proto";
package foo;
extend google.protobuf.MessageOptions {
  optional string my_option = 51234;
}

// bar.proto
import "foo.proto";
package bar;
message MyMessage {
  option (foo.my_option) = "Hello world!";
}


1: https://developers.google.com/protocol-buffers/docs/proto#customoptions


On Mon, Oct 29, 2018 at 5:19 PM Alex Amato  wrote:

> Hi Robert and community, :)
>
> I was starting to code this up, but I wasn't sure exactly how to do some
> of the proto syntax. Would you mind taking a look at this doc
> 
> and let me know if you know how to resolve any of these issues:
>
>- Using a map in an option.
>- Referring to string "constants" from other enum options in .proto
>files.
>
> Please see the comments I have listed in the doc
> ,
> and a few alternative suggestions.
> Thanks
>
> On Wed, Oct 24, 2018 at 10:08 AM Alex Amato  wrote:
>
>> Okay. That makes sense. Using runtime validation and protos is what I was
>> thinking as well.
>> I'll include you as a reviewer in my PRs.
>>
>> As for the choice of a builder/constructor/factory. If we go with factory
>> methods/constructor then we will need a method for each metric type
>> (intCounter, latestInt64, ...). Plus, then I don't think we can have any
>> constructor parameters for labels, we will just need to accept a dictionary
>> for label keys+values like you say. This is because we cannot offer a
>> method for each URN and its combination of labels, and we should avoid such
>> static detection, as you say.
>>
>> The builder however, allows us to add a method for setting each label.
>> Since there are a small number of labels I think this should be fine. A
>> specific metric URN will have a specific set of labels which we can
>> validate being set. Any variant of this should use a different label (or a
>> new version in the label). Thus, the builder can give an advantage, making
>> it easier to set label values without needing to provide the correct key
>> for the label, when its set. You just need to call the correct method.
>>
>> Perhaps it might be best to leave this open to each SDK based on its
>> language style (Builder, Factory, etc.) , but make sure we use the
>> protos+runtime validation.
>>
>> On Wed, Oct 24, 2018 at 7:02 AM Robert Bradshaw 
>> wrote:
>>
>>> Thanks for bringing this to the list; it's a good question.
>>>
>>> I think the difficulty comes from trying to statically define a lists
>>> of possibilities that should instead be runtime values. E.g. we
>>> currently we're up to about a dozen distinct types, and having a
>>> setter for each is both verbose and not very extensible (especially
>>> replicated cross language). I'm not sure the set of possible labels is
>>> fixed either. Generally in the FnAPI we've been using shared constants
>>> for this kind of thing instead. (I was wary about the protos for the
>>> same reasons; it would be good to avoid leaking this through to each
>>> of the various SDKs.) The amount of static typing/validation one gets
>>> depends on how much logic you build into each of these methods (e.g.
>>> does it (almost?) always "metric" which is assumed to already be
>>> encoded correctly, or a specific type that has tradeoffs with the
>>> amount you can do generically (e.g. we have code that first loops over
>>> counters, then over distributions, then over gauges, and I don't think
>>> we want continue that pattern all M places for all N types)).
>>>
>>> I would probably lean towards mostly doing runtime validation here.
>>> Specifically, one would have a data file that defines, for each known
>>> URN, its type along with the set of permitted/expected/required
>>> labels. On construction a MonitoringInfo data point, one would provide
>>> a URN + value + labelMap, and optionally a type. On construction, the
>>> constructor (method, factory, whatever) would look up the URN to
>>> determine the type (or throw an error if it's both not known and not
>>> provided), which could be then used to fetch an encoder of sorts (that
>>> can go from value <-> proto encoding, possibly with some validation).
>>> If labels and/or annotations are provided and the URN is known, we can
>>> validate these as well.
>>>
>>> As for proto/enums vs. yaml, the former is nice because code
>>> generation comes for free, but has turned out to be much more verbose
>>> (both the definition and the use) and I'm still on the fence whether
>>> it's a net win.
>>>
>>> 

Re: [PROPOSAL] Additional design for the Beam Python State and Timers API

2018-10-30 Thread Lukasz Cwik
My concerns are around item 4 (left the same comments in the doc).

What window should timers be using when looking up a side input?

A) The window corresponding to the element that set the original timer.
B) The window that would have been assigned based upon when the timer is
scheduled to fire.
C) The window that would have been assigned from the "output" watermark
hold time

A makes the most sense to me since it represents the side input that the
original element would have accessed. This allows people to schedule a
timer to "wait" for a side input refresh. It also handles the side input
push back issue.

I'm not sure if B or C would allow different useful user scenarios that you
would not be able to capture otherwise.

How does any of these strategies impact the side input garbage collection?

On Fri, Oct 26, 2018 at 9:47 AM Kenneth Knowles  wrote:

> It all sounds very useful but I have basic concerns about item 1. The doc
> doesn't really seem to go into the design concerns that I have in mind.
>
>  - map / flatMap are universal functions with definitions that we don't
> own and shouldn't violate
>  - corollary: map / flatMap have per element parallelism with no
> dependencies between them
>  - the doc says "map is just implemented as DoFn.process()" which is
> implementation, not the spec
>
> So suggestions:
>
>  - how about just give it a new name making it clear it is not map nor
> flatMap and does not have per-element parallelism
>  - spec out the functionality without reference to DoFn
>  - be explicit about what determines the maximum parallelism / which
> elements are required to be processed serially (generally key+window)
>
> Kenn
>
> On Fri, Oct 26, 2018 at 2:49 AM Robert Bradshaw 
> wrote:
>
>> Thanks. They make sense to me (and would have been handy when I was
>> writing the state tests for the Fn API).
>> On Fri, Oct 26, 2018 at 10:48 AM Charles Chen  wrote:
>> >
>> > Hey there,
>> >
>> > A while back, I shared the Beam Python State and Timers API proposal (
>> https://s.apache.org/beam-python-user-state-and-timers) with this list
>> [1]; we reached consensus on the features proposed there and I implemented
>> the API surface described there, along with the reference DirectRunner
>> implementation and some shared code for executing such pipelines (see for
>> example https://github.com/apache/beam/pull/5691 and
>> https://github.com/apache/beam/pull/6304).
>> >
>> > I would like to propose some additional design considerations to
>> improve upon the previous State / Timer API proposal as described here (on
>> pages 14-18 that I have appended to the doc):
>> https://docs.google.com/document/d/1GadEkAmtbJQjmqiqfSzGw3b66TKerm8tyn6TK4blAys/edit#heading=h.10nb33sz7u16
>> >
>> > Briefly, these are the additional design considerations proposed to
>> improve upon the API:
>> >
>> > Allowing access to user state / timers in Map / FlatMap callables /
>> lambdas
>> > Allowing access to the key during the timer callback
>> > Allowing access to auxiliary timer data
>> > Allowing access to side inputs during the timer callback
>> >
>> > I would really appreciate any feedback you may have.  Thanks!
>> >
>> > Best,
>> > Charles
>> >
>> >
>> > [1] Previous dev@ discussion thread:
>> https://lists.apache.org/thread.html/51ba1a00027ad8635bc1d2c0df805ce873995170c75d6a08dfe21997@%3Cdev.beam.apache.org%3E
>>
>


Re: error with DirectRunner

2018-10-30 Thread Lukasz Cwik
Udi, do you know if we have a bug tracking this issue?

If not, can you file one referencing this e-mail thread?

On Tue, Oct 30, 2018 at 6:33 AM Allie Chen  wrote:

> Thanks Udi. I agree, since it works fine removing either the side input or
> the last flatten and combine operation.
>
> On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri  wrote:
>
>> This looks like a FnApiRunner bug.
>> When I override use_fnapi_runner = False in direct_runner.py the pipeline
>> works.
>>
>> It seems like either the side-input to _copy_number or the Flatten
>> operation is the culprit.
>>
>> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen  wrote:
>>
>>> Hi,
>>>
>>> I have a project that started failing with DirectRunner, but works well
>>> using DataflowRunner (last working version is 2.4). The error message I
>>> received are:
>>> line 1088, in run_stage
>>>   pipeline_components.pcollections[actual_pcoll_id].coder_id]]
>>> KeyError: u'ref_Coder_WindowedValueCoder_1'
>>>
>>> I have simplified the pipeline to the following example. Can someone
>>> please take a look? Many thanks!
>>>
>>> Allie
>>>
>>>
>>> import apache_beam as beam
>>> import argparse
>>> from apache_beam import transforms
>>> from apache_beam import pvalue
>>> from apache_beam.options import pipeline_options
>>>
>>>
>>> def _copy_number(number, side=None):
>>>   yield number
>>>
>>>
>>> def fn_sum(values):
>>>   return sum(values)
>>>
>>>
>>> def run(argv=None):
>>>   parser = argparse.ArgumentParser()
>>>   _, pipeline_args = parser.parse_known_args(argv)
>>>   options = pipeline_options.PipelineOptions(pipeline_args)
>>>   numbers = [1, 2]
>>>   with beam.Pipeline(options=options) as p:
>>> sum_1 = (p
>>>  | 'ReadNumber1' >> transforms.Create(numbers)
>>>  | 'CalculateSum1' >> beam.CombineGlobally(fn_sum))
>>>
>>> sum_2 = (p
>>>  | 'ReadNumber2' >> transforms.Create(numbers)
>>>  | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1))
>>>  | 'CalculateSum2' >> beam.CombineGlobally(fn_sum))
>>>
>>> _ = ((sum_1, sum_2)
>>>  | beam.Flatten()
>>>  | 'CalculateSum3' >> beam.CombineGlobally(fn_sum)
>>>  | beam.io.WriteToText('gs://BUCKET/sum'))
>>>
>>>
>>>
>>>


Re: Flink 1.6 Support

2018-10-30 Thread Thomas Weise
There has not been any decision to move to 1.6.x for the next release yet.

There has been related general discussion about upgrading runners recently
[1]

Overall we need to consider the support for newer Flink versions that users
find (the Flink version in distributions and what users typically have in
their deployment stacks). These upgrades are not automatic/cheap/fast, so
there is a balance to strike.

The good news is that with Beam 2.8.0 you should be able to make a build
for 1.6.x with just a version number change [2]  (Other compile differences
have been cleaned up.)

[1]
https://lists.apache.org/thread.html/0588ed783767991aa36b00b8529bbd29b3a8958ee6e82fca83ac2938@%3Cdev.beam.apache.org%3E
[2]
https://github.com/apache/beam/blob/v2.8.0/runners/flink/build.gradle#L49


On Tue, Oct 30, 2018 at 10:50 AM Lukasz Cwik  wrote:

> +dev 
>
> On Tue, Oct 30, 2018 at 10:30 AM Jins George 
> wrote:
>
>> Hi Community,
>>
>> Noticed that the Beam 2.8 release comes with flink  1.5.x dependency.
>> Are there any plans to upgrade flink to  1.6.x  in next beam release. (
>> I am looking for the better k8s  support in Flink 1.6)
>>
>> Thanks,
>>
>> Jins George
>>
>>


Re: Flink 1.6 Support

2018-10-30 Thread Lukasz Cwik
+dev 

On Tue, Oct 30, 2018 at 10:30 AM Jins George  wrote:

> Hi Community,
>
> Noticed that the Beam 2.8 release comes with flink  1.5.x dependency.
> Are there any plans to upgrade flink to  1.6.x  in next beam release. (
> I am looking for the better k8s  support in Flink 1.6)
>
> Thanks,
>
> Jins George
>
>


Re: error with DirectRunner

2018-10-30 Thread Allie Chen
Thanks Udi. I agree, since it works fine removing either the side input or
the last flatten and combine operation.

On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri  wrote:

> This looks like a FnApiRunner bug.
> When I override use_fnapi_runner = False in direct_runner.py the pipeline
> works.
>
> It seems like either the side-input to _copy_number or the Flatten
> operation is the culprit.
>
> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen  wrote:
>
>> Hi,
>>
>> I have a project that started failing with DirectRunner, but works well
>> using DataflowRunner (last working version is 2.4). The error message I
>> received are:
>> line 1088, in run_stage
>>   pipeline_components.pcollections[actual_pcoll_id].coder_id]]
>> KeyError: u'ref_Coder_WindowedValueCoder_1'
>>
>> I have simplified the pipeline to the following example. Can someone
>> please take a look? Many thanks!
>>
>> Allie
>>
>>
>> import apache_beam as beam
>> import argparse
>> from apache_beam import transforms
>> from apache_beam import pvalue
>> from apache_beam.options import pipeline_options
>>
>>
>> def _copy_number(number, side=None):
>>   yield number
>>
>>
>> def fn_sum(values):
>>   return sum(values)
>>
>>
>> def run(argv=None):
>>   parser = argparse.ArgumentParser()
>>   _, pipeline_args = parser.parse_known_args(argv)
>>   options = pipeline_options.PipelineOptions(pipeline_args)
>>   numbers = [1, 2]
>>   with beam.Pipeline(options=options) as p:
>> sum_1 = (p
>>  | 'ReadNumber1' >> transforms.Create(numbers)
>>  | 'CalculateSum1' >> beam.CombineGlobally(fn_sum))
>>
>> sum_2 = (p
>>  | 'ReadNumber2' >> transforms.Create(numbers)
>>  | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1))
>>  | 'CalculateSum2' >> beam.CombineGlobally(fn_sum))
>>
>> _ = ((sum_1, sum_2)
>>  | beam.Flatten()
>>  | 'CalculateSum3' >> beam.CombineGlobally(fn_sum)
>>  | beam.io.WriteToText('gs://BUCKET/sum'))
>>
>>
>>
>>