What is the "Keyed State" in the capability matrix?

2016-06-24 Thread Shen Li
Hi,

There is a "Keyed State" row in the  "What is being computed" section of
the capability matrix. What does the "Keyed State" refer to? Is it a global
key-value store?

(
http://beam.incubator.apache.org/beam/capability/2016/03/17/capability-matrix.html
)

Thanks,

Shen


Re: What is the "Keyed State" in the capability matrix?

2016-06-24 Thread Shen Li
Hi Kenn,

Thanks for replying. I look forward to the technical docs.

Shen

On Fri, Jun 24, 2016 at 2:17 PM, Kenneth Knowles 
wrote:

> Hi Shen,
>
> The row refers to the ability for a DoFn in a ParDo to access per-key (and
> window) state cells that persist beyond the lifetime of an element or
> bundle. This is a feature that was in the later stages of design when the
> Beam code was donated. Hence it a row in the graph, but even the Beam Model
> column says "no", pending a public design proposal & consensus. Most
> runners already have a similar capability at a low level; this feature
> refers to exposing it in a nice way for users.
>
> I have a design doc that I'm busily revising to make sense for the whole
> community. I will send the doc to this list and add it to our technical
> docs folder as soon as I can get it ready. You can follow BEAM-25 [1] if
> you like, too.
>
> Kenn
>
> [1] https://issues.apache.org/jira/browse/BEAM-25
>
>
> On Fri, Jun 24, 2016 at 10:56 AM, Shen Li  wrote:
>
> > Hi,
> >
> > There is a "Keyed State" row in the  "What is being computed" section of
> > the capability matrix. What does the "Keyed State" refer to? Is it a
> global
> > key-value store?
> >
> > (
> >
> >
> http://beam.incubator.apache.org/beam/capability/2016/03/17/capability-matrix.html
> > )
> >
> > Thanks,
> >
> > Shen
> >
>


How to control the parallelism when run ParDo on PCollection?

2016-06-24 Thread Shen Li
Hi,

The document says "when a ParDo transform is executed, the elements of the
input PCollection are first divided up into some number of bundles".

How do users control the number of bundles/parallelism? Or is it completely
up to the runner?

Thanks,

Shen


Re: How to control the parallelism when run ParDo on PCollection?

2016-06-24 Thread Shen Li
Hi Kenn,

Thanks for the explanation.

Regards,

Shen

On Fri, Jun 24, 2016 at 4:09 PM, Kenneth Knowles 
wrote:

> Hi Shen,
>
> It is completely up to the runner how to divide things into bundles: it is
> one item of work that should fail or succeed atomically. Bundling limits
> parallelism, but does not determine it. For example, a streaming execution
> may have many bundles over time as elements arrive, regardless of
> parallelism.
>
> Kenn
>
> On Fri, Jun 24, 2016 at 12:13 PM, Shen Li  wrote:
>
> > Hi,
> >
> > The document says "when a ParDo transform is executed, the elements of
> the
> > input PCollection are first divided up into some number of bundles".
> >
> > How do users control the number of bundles/parallelism? Or is it
> completely
> > up to the runner?
> >
> > Thanks,
> >
> > Shen
> >
>


Re: How to control the parallelism when run ParDo on PCollection?

2016-06-24 Thread Shen Li
Hi Thomas,

Thanks for the follow-up.

Shen

On Fri, Jun 24, 2016 at 4:49 PM, Thomas Groh 
wrote:

> We do also have an active JIRA issue to support limiting parallelism on a
> per-step basis, BEAM-68
>
> https://issues.apache.org/jira/browse/BEAM-68
>
> As Kenn noted, this is not equivalent to controls over bundling, which is
> entirely determined by the runner.
>
> On Fri, Jun 24, 2016 at 1:25 PM, Shen Li  wrote:
>
> > Hi Kenn,
> >
> > Thanks for the explanation.
> >
> > Regards,
> >
> > Shen
> >
> > On Fri, Jun 24, 2016 at 4:09 PM, Kenneth Knowles  >
> > wrote:
> >
> > > Hi Shen,
> > >
> > > It is completely up to the runner how to divide things into bundles: it
> > is
> > > one item of work that should fail or succeed atomically. Bundling
> limits
> > > parallelism, but does not determine it. For example, a streaming
> > execution
> > > may have many bundles over time as elements arrive, regardless of
> > > parallelism.
> > >
> > > Kenn
> > >
> > > On Fri, Jun 24, 2016 at 12:13 PM, Shen Li  wrote:
> > >
> > > > Hi,
> > > >
> > > > The document says "when a ParDo transform is executed, the elements
> of
> > > the
> > > > input PCollection are first divided up into some number of bundles".
> > > >
> > > > How do users control the number of bundles/parallelism? Or is it
> > > completely
> > > > up to the runner?
> > > >
> > > > Thanks,
> > > >
> > > > Shen
> > > >
> > >
> >
>


Sliding-Windowed PCollectionView as SideInput

2016-06-26 Thread Shen Li
Hi,

I am little confused about how the runner should handle SideInput if it
comes from a sliding-windowed PCollection.

Say we have two PCollections A and B. Apply
Window.into(SlidingWindows.of...) on B, and create a View from it (call it
VB).

Then, a Pardo takes the PCollection A as the main input and VB as side
input: A.apply(ParDo.withSideInputs(VB).of(new DoFun() {...})).

In the DoFun.processElement(), when the user code calls
ProcessContext.sideInput(VB), the view of which window in VB should be
returned if the event time of the current element in A corresponds to
multiple sliding windows in B?


Thanks,

Shen


Re: Sliding-Windowed PCollectionView as SideInput

2016-06-27 Thread Shen Li
Hi Aljoscha,

Thanks for the explanation.

Shen

On Mon, Jun 27, 2016 at 4:38 AM, Aljoscha Krettek 
wrote:

> Hi,
> the WindowFn is responsible for mapping from main-input window to
> side-input window. Have a look at WindowFn.getSideInputWindow(). For
> SlidingWindows this takes the last possible sliding window as the
> side-input window.
>
> Cheers,
> Aljoscha
>
> On Sun, 26 Jun 2016 at 22:30 Shen Li  wrote:
>
> > Hi,
> >
> > I am little confused about how the runner should handle SideInput if it
> > comes from a sliding-windowed PCollection.
> >
> > Say we have two PCollections A and B. Apply
> > Window.into(SlidingWindows.of...) on B, and create a View from it (call
> it
> > VB).
> >
> > Then, a Pardo takes the PCollection A as the main input and VB as side
> > input: A.apply(ParDo.withSideInputs(VB).of(new DoFun() {...})).
> >
> > In the DoFun.processElement(), when the user code calls
> > ProcessContext.sideInput(VB), the view of which window in VB should be
> > returned if the event time of the current element in A corresponds to
> > multiple sliding windows in B?
> >
> >
> > Thanks,
> >
> > Shen
> >
>


Window Strategy for KeyedPCollectionTuples and CoGroupByKey

2016-07-06 Thread Shen Li
Hi,

If the PCollections in a KeyedPCollectionTuple have different window
strategies (WindowFn, Trigger, etc.), how does a CoGroupByKey work? When
will it be triggered? How does it determine which kvs from each PCollection
to co-group?

Thanks,

Shen


Re: Window Strategy for KeyedPCollectionTuples and CoGroupByKey

2016-07-06 Thread Shen Li
Hi Robert,

Thanks for your response.

Regards,

Shen

On Wed, Jul 6, 2016 at 4:02 PM, Robert Bradshaw  wrote:

> It is an error at pipeline construction time to use CoGroupByKey with
> differing windowing strategies. If you want to do such joins, you may
> want to look into using side inputs which are more flexible.
>
> On Wed, Jul 6, 2016 at 8:01 AM, Shen Li  wrote:
> > Hi,
> >
> > If the PCollections in a KeyedPCollectionTuple have different window
> > strategies (WindowFn, Trigger, etc.), how does a CoGroupByKey work? When
> > will it be triggered? How does it determine which kvs from each
> PCollection
> > to co-group?
> >
> > Thanks,
> >
> > Shen
>


Help understand how Flink Runner translate triggering information

2016-07-25 Thread Shen Li
Hi,

I am trying to understand how Flink Runner tells the Flink system about the
triggers defined using Beam API. In the source code of Flink runner, the
WindowBoundTranslator passes the windowingStrategy to the
FlinkParDoBoundWrapper which does not seem to use it? How is the triggering
information passed to Flink?

Thanks,

Shen


Re: Help understand how Flink Runner translate triggering information

2016-07-25 Thread Shen Li
Hi Aljoscha,

Thanks a lot for your help!

Shen

On Mon, Jul 25, 2016 at 12:31 PM, Aljoscha Krettek 
wrote:

> Hi,
> for that you would have to look at how Combine.PerKey and GroupByKey are
> translated. We use a GroupAlsoByWindowViaWindowSetDoFn that internally uses
> a ReduceFnRunner to manage all the windowing. The windowing strategy as
> well as the SystemReduceFn is passed to
> GroupAlsoByWindowViaWindowSetDoFn.create() to create an actual instance of
>  GroupAlsoByWindowViaWindowSetDoFn.
>
> Cheers,
> Aljoscha
>
> On Mon, 25 Jul 2016 at 17:55 Shen Li  wrote:
>
> > Hi,
> >
> > I am trying to understand how Flink Runner tells the Flink system about
> the
> > triggers defined using Beam API. In the source code of Flink runner, the
> > WindowBoundTranslator passes the windowingStrategy to the
> > FlinkParDoBoundWrapper which does not seem to use it? How is the
> triggering
> > information passed to Flink?
> >
> > Thanks,
> >
> > Shen
> >
>


Questions about IdentityWindowFn

2016-08-15 Thread Shen Li
Hi,

I am a little confused about the usage of internal IdentityWindowFn. My
current understanding is that it should be used when a transform wants to
change window configurations (e.g., triggering, lateness, etc.) without
modifying the window assignments. Is that correct? If so, how come
IdentifyWindowFn.assignWindows() returns a collection of a single
BoundedWindow? What if the upstream WindowFn assigns an element to multiple
windows (e.g., SlidingWindows)?

Thanks,

Shen


Re: Questions about IdentityWindowFn

2016-08-15 Thread Shen Li
Hi Lukasz,

Thanks for the explanation.

Shen

On Mon, Aug 15, 2016 at 12:02 PM, Lukasz Cwik 
wrote:

> Some areas of the Beam model in the SDK alluded to the use of a compressed
> representation of an element along with the set of windows it is assigned
> to. However, the model itself views elements in different windows as fully
> independent, so the SDK should not place any obligation on the part of the
> runner or user to use a particular representation.
>
> This change
> <https://github.com/apache/incubator-beam/commit/
> 08104410177063b1095bd91b24b40f9961c92cf2>
> removed those places in the SDK where an element is treated in multiple
> windows at once. This was done by exploding the compressed representation
> so [(W1, W2), V] became [W1, V] and [W2, V]
>
> On Mon, Aug 15, 2016 at 7:25 AM, Shen Li  wrote:
>
> > Hi,
> >
> > I am a little confused about the usage of internal IdentityWindowFn. My
> > current understanding is that it should be used when a transform wants to
> > change window configurations (e.g., triggering, lateness, etc.) without
> > modifying the window assignments. Is that correct? If so, how come
> > IdentifyWindowFn.assignWindows() returns a collection of a single
> > BoundedWindow? What if the upstream WindowFn assigns an element to
> multiple
> > windows (e.g., SlidingWindows)?
> >
> > Thanks,
> >
> > Shen
> >
>


PipelineOptions JSON command line argument

2016-09-02 Thread Shen Li
Hi,

I am trying to understand how can I extend PipelineOptions to add getters
and setters with my custom type. The document in PiplineOptionsFactory says
"JSON format is required for all other types". If I want to use
java.util.logging.Level in a getter and a setter, what JSON string should I
pass in the command line?

Thanks,

Shen


Re: PipelineOptions JSON command line argument

2016-09-02 Thread Shen Li
Hi JB,

Thanks a lot for your response. I still don't understand if the type of the
setter and getter is not simple properties (String, int, long, boolean,
etc.), how should I compose the JSON argument in the command line. For
example, my Options interface is like below:

import java.util.logging.Level;

private Interface Options extends PipelineOptions {
@Description("log level")
Level getLogLevel();
void setLogLevel(Level value);
}


If I want the log level to be INFO, what JSON string should I pass to
"--logLevel=" command line argument?

Thanks,

Shen

On Fri, Sep 2, 2016 at 9:31 AM, Jean-Baptiste Onofré 
wrote:

> Hi Shen,
>
> you can extend PipelineOptions like this:
>
> private interface Options extends PipelineOptions {
> String GDELT_EVENTS_URL = "http://data.gdeltproject.org/events/";;
>
> @Description("GDELT file date")
> @Default.InstanceFactory(GDELTFileFactory.class)
> String getDate();
> void setDate(String value);
>
> @Description("Input Path")
> String getInput();
> void setInput(String value);
>
> @Description("Output Path")
> String getOutput();
> void setOutput(String value);
>
> class GDELTFileFactory implements DefaultValueFactory {
> public String create(PipelineOptions options) {
> SimpleDateFormat format = new SimpleDateFormat("MMdd");
> return format.format(new Date());
> }
> }
> }
>
> and then:
>
> Options options = PipelineOptionsFactory.fromArg
> s(args).withValidation().as(Options.class);
>
> Regards
> JB
>
>
> On 09/02/2016 03:21 PM, Shen Li wrote:
>
>> Hi,
>>
>> I am trying to understand how can I extend PipelineOptions to add getters
>> and setters with my custom type. The document in PiplineOptionsFactory
>> says
>> "JSON format is required for all other types". If I want to use
>> java.util.logging.Level in a getter and a setter, what JSON string should
>> I
>> pass in the command line?
>>
>> Thanks,
>>
>> Shen
>>
>>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Compilation Failure: release-0.2.0 referencing sdk-0.3.0

2016-11-03 Thread Shen Li
Hi,

I am trying to compile Beam 0.2.0 release, but got the following error.


[INFO] [ERROR] Failed to execute goal
org.apache.maven.plugins:maven-compiler-plugin:3.3:compile
(default-compile) on project basic: Compilation failure: Compilation
failure:
[INFO] [ERROR]
/homes/hny9/shenli/Project/incubator-beam/sdks/java/maven-archetypes/starter/target/test-classes/projects/basic/project/basic/src/main/java/it/pkg/StarterPipeline.
java:[60,7] method does not override or implement a method from a supertype
[INFO] [ERROR]
/homes/hny9/shenli/Project/incubator-beam/sdks/java/maven-archetypes/starter/target/test-classes/projects/basic/project/basic/src/main/java/it/pkg/StarterPipeline.
java:[54,7] method does not override or implement a method from a supertype

I looked at the code in StarterPipeline.java (as shown below). It is trying
to override the DoFn.processElement() method. In the 0.2.0 release,
processElement is still in DoFn rather than OldDoFn. I suspect it is
importing beam sdk 0.3.0. So, I clean the repo, and removed the beam fold
from ~/.m2. Still got the same error.

public class StarterPipeline {
  private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.
class);

  public static void main(String[] args) {
Pipeline p = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());

p.apply(Create.of("Hello", "World"))
.apply(ParDo.of(new DoFn() {
  @Override
  public void processElement(ProcessContext c) {
c.output(c.element().toUpperCase());
  }
}))
.apply(ParDo.of(new DoFn() {
  @Override
  public void processElement(ProcessContext c)  {
LOG.info(c.element());
  }
}));

p.run();
  }
}


After the compilation, the 0.3.0 jar does exist in
~/.m2/repository/org/apache/beam/beam-sdks-java-core/0.3.0-incubating/. And
the pom.xml
in 
sdks/java/maven-archetypes/starter/target/test-classes/projects/basic/project/basic
is referencing the latest release in the repository.


   
  org.apache.beam
  beam-sdks-java-core
  [0-incubating, 1-incubating)



Can someone help with this?


Thanks,


Shen


How to create a Pipeline with Cycles

2016-11-29 Thread Shen LI
Hi,

Can I use Beam to create a pipeline with cycles? For example, to implement
the Yahoo! Streaming benchmark(
https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at),
can a ParDo transform consume a downstream output as a side input?

Thanks,

Shen


Re: How to create a Pipeline with Cycles

2016-11-29 Thread Shen LI
Hi Maria, Bobby,

Thanks for the explanation.

Regards,

Shen

On Tue, Nov 29, 2016 at 12:37 PM, Bobby Evans 
wrote:

> In my experience almost all of the time cycles are bad and cause a lot of
> debugging problems. Most of the time you can implement what you want by
> using a windowed join or group by instead.
> - Bobby
>
> On Tuesday, November 29, 2016, 11:06:44 AM CST, María García Herrero
>  wrote:Hi Shen,
>
> No. Beam pipelines are DAGs:
> http://beam.incubator.apache.org/documentation/sdks/
> javadoc/0.3.0-incubating/org/apache/beam/sdk/Pipeline.html
> Best,
>
> María
>
> On Tue, Nov 29, 2016 at 7:44 AM, Shen LI  wrote:
>
> > Hi,
> >
> > Can I use Beam to create a pipeline with cycles? For example, to
> implement
> > the Yahoo! Streaming benchmark(
> > https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-
> > computation-engines-at),
> > can a ParDo transform consume a downstream output as a side input?
> >
> > Thanks,
> >
> > Shen
> >
>


How to serialize/deserialize a Pipeline object?

2016-12-21 Thread Shen Li
Hi,

What are the recommended ways to serialize/deserialize a Pipeline object? I
need to submit a pipeline object to cloud for execution and fetch the
result.

Thanks,

Shen


How to serialize/deserialize a Pipeline object?

2016-12-21 Thread Shen LI
Hi,

What are the recommended ways to serialize/deserialize a Pipeline object? I
need to submit a pipeline object to cloud for execution and fetch the
result.

Thanks,

Shen


Re: How to serialize/deserialize a Pipeline object?

2016-12-21 Thread Shen Li
Hi Kenn,

Thanks a lot for the information.

Sure, below are more details about the problem I encountered.

I am developing a runner for IBM Streams, and am exploring possible ways to
conduct integration tests. As Streams is not an open-source project, we
cannot add the full set of libraries to Maven Central repo. Nor can we
guarantee to provide a server (with Streams installed) as a long-term
Jenkins slave. So, it seems more flexible to let the runner submit the
graph to a Streams cloud service, and provide the account info through
"-DrunnableOnServicePipelineOptions" (please correct me if it does not work
in this way). The problem is that the runner cannot convert the Pipeline
into a Streams graph format without a local Streams install. So, I am
thinking about sending the serialized Pipeline to the Cloud service for
execution. Maybe I should create some intermediate format between the
Pipeline and Streams graph format. Or, is there any other way to carry out
the integration test without a Streams install?

Thanks,

Shen


On Wed, Dec 21, 2016 at 12:08 PM, Kenneth Knowles 
wrote:

> Hi Shen,
>
> I want to tell you (1) how things work today and (2) how we want them to be
> eventually.
>
> (1) So far, each runner translates the Pipeline to their own graph format
> before serialization, so we have not yet encountered this issue.
>
> (2) We intend to make a standard mostly-readable JSON format for a
> Pipeline. It is based the Avro schema sketched in the design doc at
> https://s.apache.org/beam-runner-api and there is also a draft JSON schema
> at https://github.com/apache/incubator-beam/pull/662.
>
> You may wish to follow https://issues.apache.org/jira/browse/BEAM-115,
> though that is a very general ticket.
>
> Can you share any more details?
>
> Kenn
>
> On Wed, Dec 21, 2016 at 8:47 AM, Shen Li  wrote:
>
> > Hi,
> >
> > What are the recommended ways to serialize/deserialize a Pipeline
> object? I
> > need to submit a pipeline object to cloud for execution and fetch the
> > result.
> >
> > Thanks,
> >
> > Shen
> >
>


Re: How to serialize/deserialize a Pipeline object?

2016-12-21 Thread Shen Li
Hi Kenn, Robert,

Thanks for your help!

Shen

On Wed, Dec 21, 2016 at 2:32 PM, Kenneth Knowles 
wrote:

> I went ahead and filed a more specific ticket just for this use,
> https://issues.apache.org/jira/browse/BEAM-1196
>
> On Wed, Dec 21, 2016 at 11:12 AM, Robert Bradshaw <
> rober...@google.com.invalid> wrote:
>
> > On Wed, Dec 21, 2016 at 10:58 AM, Shen Li  wrote:
> > > Hi Kenn,
> > >
> > > Thanks a lot for the information.
> > >
> > > Sure, below are more details about the problem I encountered.
> > >
> > > I am developing a runner for IBM Streams, and am exploring possible
> ways
> > to
> > > conduct integration tests. As Streams is not an open-source project, we
> > > cannot add the full set of libraries to Maven Central repo. Nor can we
> > > guarantee to provide a server (with Streams installed) as a long-term
> > > Jenkins slave. So, it seems more flexible to let the runner submit the
> > > graph to a Streams cloud service, and provide the account info through
> > > "-DrunnableOnServicePipelineOptions" (please correct me if it does not
> > work
> > > in this way). The problem is that the runner cannot convert the
> Pipeline
> > > into a Streams graph format without a local Streams install. So, I am
> > > thinking about sending the serialized Pipeline to the Cloud service for
> > > execution. Maybe I should create some intermediate format between the
> > > Pipeline and Streams graph format. Or, is there any other way to carry
> > out
> > > the integration test without a Streams install?
> >
> > Choosing an intermediate representation that can be serialized and
> > sent to a cloud service (where it is then translated into the actual
> > implementation representation) is a fine solution. In fact that's what
> > Dataflow itself does.
> >
> > Of course we'll want to move as close to (2) as possible once it exists.
> >
> > > On Wed, Dec 21, 2016 at 12:08 PM, Kenneth Knowles
>  > >
> > > wrote:
> > >
> > >> Hi Shen,
> > >>
> > >> I want to tell you (1) how things work today and (2) how we want them
> > to be
> > >> eventually.
> > >>
> > >> (1) So far, each runner translates the Pipeline to their own graph
> > format
> > >> before serialization, so we have not yet encountered this issue.
> > >>
> > >> (2) We intend to make a standard mostly-readable JSON format for a
> > >> Pipeline. It is based the Avro schema sketched in the design doc at
> > >> https://s.apache.org/beam-runner-api and there is also a draft JSON
> > schema
> > >> at https://github.com/apache/incubator-beam/pull/662.
> > >>
> > >> You may wish to follow https://issues.apache.org/jira/browse/BEAM-115
> ,
> > >> though that is a very general ticket.
> > >>
> > >> Can you share any more details?
> > >>
> > >> Kenn
> > >>
> > >> On Wed, Dec 21, 2016 at 8:47 AM, Shen Li  wrote:
> > >>
> > >> > Hi,
> > >> >
> > >> > What are the recommended ways to serialize/deserialize a Pipeline
> > >> object? I
> > >> > need to submit a pipeline object to cloud for execution and fetch
> the
> > >> > result.
> > >> >
> > >> > Thanks,
> > >> >
> > >> > Shen
> > >> >
> > >>
> >
>