Re: Windowing in a batch pipeline

2017-11-08 Thread Robert Bradshaw
On Wed, Nov 8, 2017 at 5:33 PM, Jacob Marble  wrote:
> Good evening. I'm trying to nail down windowing. The concept is clear, just
> struggling with writing a working pipeline. Tonight the goal is group events
> by key and window, in a batch pipeline. All data is "late" because it's a
> batch pipeline, and I expect nothing to be dropped or processed in a "late"
> context.

Traditionally, in a batch pipeline we consider no data to be late, as
we have perfect knowledge of the watermark.

> Read section 7 and 8 of the Beam Programming Guide roughly twice.
> Sifted through the examples, WindowedWordCount is close, but it doesn't use
> triggering, which is where (2b) is probably off track.
>
> 1)
> PCollection is created through a series of transforms, including a
> Join.leftOuterJoin(). Apply a timestamp with something simple:
>
> collection.apply("add window timestamp",
>  ParDo.of(new DoFn() {
>   @ProcessElement
>   public void map(ProcessContext context) {
>Foo element = context.element();
>Instant timestamp = new Instant(element.getActivityUnixSeconds() * 1000);
>context.outputWithTimestamp(element, timestamp);
>   }
>  }));
>
> This fails with "java.lang.IllegalArgumentException: Cannot output with
> timestamp 2017-04-01T00:00:00.000Z. Output timestamps must be no earlier
> than the timestamp of the current input (294247-01-09T04:00:54.775Z) minus
> the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
> Javadoc for details on changing the allowed skew."
>
> Is this expected? I don't care about skew, just want to set the timestamp
> per element.
>
> I worked around this by applying the timestamp earlier in the pipeline,
> right after a TextIO.read(). Why does that fix the problem?

I would suspect that very-far-in-the-future timestamp is the end of
the global window, set as the timestamp as the result of a
group-by-key.

You can set your timestamps earlier, as you have done, but in this
case they will get reset after passing through any GBK. It's possible
you could get what you want by setting TimestampCombiner to EARLIEST
(see 
https://github.com/apache/beam/blob/v2.1.1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java#L47)
but probably the right solution is to set the allowed timestamp skew
to infinity (or Long.MAX_VALUE or similar).

Generally this skew is needed in streaming to hold the watermark back
the right amount... Definitely not intuitive in your case; we should
think if there's something better we could do here.

> 2a)
> After applying the timestamp, let's window!
>
> collection.apply("window into sessions",
>  Window.into(Sessions.withGapDuration(Duration.standardMinutes(10
>  .apply("key by something, reduce")
>  .apply(TextIO.write()...)
>
> Now I see an output file, what joy! But the output file is empty. Confirmed
> that the PCollection feeding TextIO.write() is seeing data. Maybe this is
> because the default trigger is incorrect for my use case? I expected not to
> need triggering in batch context, but the DefaultTrigger Javadoc makes me
> believe otherwise.
>
> 2b)
> How about the Never.ever() trigger? Javadoc: "Using this trigger will only
> produce output when the watermark passes the end of the {@link BoundedWindow
> window}". I don't know, but let's try. There's some error about allowed
> lateness and firing panes, so I'll try values that look standard:
>
> collection.apply("window into sessions",
>  Window.into(Sessions.withGapDuration(Duration.standardMinutes(10)))
>
> .triggering(Never.ever()).withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes())
>  .apply("key by something, reduce")
>  .apply(TextIO.write()...)
>
> This yields a new error:
> "java.lang.IllegalStateException: TimestampCombiner moved element from
> 294247-01-09T04:10:54.774Z to earlier time 294247-01-09T04:00:54.775Z (end
> of global window) for window
> org.apache.beam.sdk.transforms.windowing.GlobalWindow"
>
> So I'm probably looking in the wrong place.

I think if you resolve the issues above than this will take care of itself.

- Robert


Windowing in a batch pipeline

2017-11-08 Thread Jacob Marble
Good evening. I'm trying to nail down windowing. The concept is clear, just
struggling with writing a working pipeline. Tonight the goal is group
events by key and window, in a batch pipeline. All data is "late" because
it's a batch pipeline, and I expect nothing to be dropped or processed in a
"late" context.

Read section 7 and 8 of the Beam Programming Guide roughly twice.
Sifted through the examples, WindowedWordCount is close, but it doesn't use
triggering, which is where (2b) is probably off track.

1)
PCollection is created through a series of transforms, including a
Join.leftOuterJoin(). Apply a timestamp with something simple:

collection.apply("add window timestamp",
 ParDo.of(new DoFn() {
  @ProcessElement
  public void map(ProcessContext context) {
   Foo element = context.element();
   Instant timestamp = new Instant(element.getActivityUnixSeconds() * 1000);
   context.outputWithTimestamp(element, timestamp);
  }
 }));

This fails with "java.lang.IllegalArgumentException: Cannot output with
timestamp 2017-04-01T00:00:00.000Z. Output timestamps must be no earlier
than the timestamp of the current input (294247-01-09T04:00:54.775Z) minus
the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
Javadoc for details on changing the allowed skew."

*Is this expected? I don't care about skew, just want to set the timestamp
per element.*

I worked around this by applying the timestamp earlier in the pipeline,
right after a TextIO.read(). Why does that fix the problem?

2a)
After applying the timestamp, let's window!

collection.apply("window into sessions",
 Window.into(Sessions.withGapDuration(Duration.standardMinutes(10
 .apply("key by something, reduce")
 .apply(TextIO.write()...)

Now I see an output file, what joy! *But the output file is empty.* Confirmed
that the PCollection feeding TextIO.write() is seeing data. Maybe this is
because the default trigger is incorrect for my use case? I expected not to
need triggering in batch context, but the DefaultTrigger Javadoc makes me
believe otherwise.

2b)
How about the Never.ever() trigger? Javadoc: "Using this trigger will only
produce output when the watermark passes the end of the {@link
BoundedWindow window}". I don't know, but let's try. There's some error
about allowed lateness and firing panes, so I'll try values that look
standard:

collection.apply("window into sessions",
 Window.into(Sessions.withGapDuration(Duration.standardMinutes(10)))
  .triggering(Never.ever()).withAllowedLateness(Duration.stand
ardDays(1)).discardingFiredPanes())
 .apply("key by something, reduce")
 .apply(TextIO.write()...)

This yields a new error:
"java.lang.IllegalStateException: TimestampCombiner moved element from
294247-01-09T04:10:54.774Z to earlier time 294247-01-09T04:00:54.775Z (end
of global window) for window org.apache.beam.sdk.transforms
.windowing.GlobalWindow"

So I'm probably looking in the wrong place.

Thanks!

Jacob


Re: design pattern for enriching data via db lookups?

2017-11-08 Thread Lukasz Cwik
For joining with external data you have some options:
* Do direct calls to the external datastore, perform your own in memory
caching/expiration. You control exactly what happens and when it happens
but as you have done this in the past you know what this entails.
* Ingest the external data and perform CoGBK
 on
a common key. Works well for datasets which have comparable data sizes.
* Ingest the external data and use it as a map/multimap side input
.
Works well for datasets where one dataset is much smaller then the other.
(Especially if the dataset can fit in memory).

Based on your data set being small I would suggest using the side input
approach. When you ingest the external data, you can perform any transforms
that are required to get to the common key including running the Beam SQL
stuff. The SQL stuff is available for Cloud Dataflow but not yet officially
supported. As for ingesting the external data, it all depends on where it
is coming from but the closest IO connector to your data source is the best.


On Thu, Nov 2, 2017 at 10:26 AM, Christopher Bockman 
wrote:

> Hi,
>
> We're evaluating Beam and trying to figure out if it meets our needs
> (suspect it does!), and, if so, how to best set up our code.
>
> Many thanks in advance.
>
> *Basic scenario:*
>
> * Data (for machine learning/prediction) comes in.  Data is a set of
> documents which are 100% independent.  We want to apply some
> transformations to those items on a per-doc basis.
>
> - Many of the transformations are easily and obviously encapsulated in
> beam user code.
>
> - *However, we'd also like to enrich the data via queries to external
> databases.  How do we do that efficiently *(largely in time, but also in
> compute resources)*?*
>
> *Additional constraints:*
>
> - We are developing on Google Cloud, if it matters.
>
> - Ideally we can achieve below in Python (versus Java), to ease porting
> existing code.
>
> *Some examples:*
>
> 1) *Key-value query.*
>
> Text comes in, and we'd like to do some pre-processing to the text, and
> then look up certain subsets of that text against an external database.
> Those found mappings need to be associated with the underlying text.
>
> E.g., imagine we're doing Named Entity Recognition and trying to augment
> with a large, fine-grained external gazetteer.
>
> "I went to the Los Angeles Dodgers game."  (RIP)
>
> Perhaps we generate ngrams ("I", ..., "I went", "went to", ..., "I went
> to", ..., "Los Angeles Dodgers", ...) and then find that "Los Angeles
> Dodgers" maps to entity 123456, and "Los Angeles" maps to 234567, and we
> want to map those back into the underlying document.
>
> 2) *More sophisticated query.*
>
> We do a bunch of calculations on the data, and then derive some
> intermediary result, and need to look that result up against an external
> database to generate a final result for the data.  These queries might
> require a bit more SQL sophistication (table joining, looking up multiple
> rows and filtering, etc.).
>
> * Scenario #1 is more important than #2, because, worst case, we can
> probably cast most of our external enrichment to a key-value paradigm.
>
> *The concern: the database query becomes the choke point*
>
> * Most naive implementation would seem to be write user code that grabs
> each doc and does a remote database lookup for that doc.
>
> We initially had this implemented (outside of Beam), but found that
> (unsurprisingly) *the round-trip to the database became a blocker*--code
> would just be waiting on the DB round-trip and so processing slowed down
> dramatically (from keeping the db local via, ultimately unmanagable, a
> local SQLlite instance).
>
> Our solution was to 1) implement multi-threading (to limit the db queries
> blocking) and 2) implement local caching of lookups (using
> https://dogpilecache.readthedocs.io/en/latest/usage.html).  Both of these
> did dramatically sped things up for the single-machine (non-Beam) scenario.
>
> *Is there an equivalent (direct code or design pattern) of either #1 or #2
> in Beam?  *(The answer to #1 might just be that Beam automatically adds
> more documents to be processed when it realizes things are slower than they
> "should be"?)
>
> *Possible solution?: pull the table(s), in full, down to the Beam cluster*
>
> * The tables we're working with aren't terribly large by modern standards
> (although I'd like to design for this potentially changing)--maybe a few GB
> at most, and probably easily shaved down to 100s of MBs.  Conceptually--if
> quicker--we could (I think?) do something like pull the entire table down
> in a PCollection and then use that data "locally" (i.e., within the Beam
> cluster).
>
> - Presumably, for key-value lookup, we could write some query to
> efficiently cross-reference the two PCollection's (i.e., the db and the
> actual source data).  (...although I

Re: London Apache Beam meetup 2: call for speakers

2017-11-08 Thread Matthias Baetens
No worries JB, I'll send you a message on how we can plan around this
(reschedule the meetup or postpone your session).
Thanks for the heads-up, have fun in Singapore!

Best,
Matthias

Op di 7 nov. 2017 om 04:52 schreef Jean-Baptiste Onofré :

> Hi,
>
> unfortunately, I have to decline the invite as I will be at Strata
> Singapore in
> the same time :(
>
> I'm very sorry about that. You can count on me for the 3rd edition !
>
> Regards
> JB
>
> On 11/07/2017 01:41 AM, Matthias Baetens wrote:
> > Hi all!
> >
> > Hope you are well.
> > We are back for a second edition of the London Apache Beam meetup,
> aiming for
> > the 5th of December.
> >
> > We are pretty excited to announce that our first speaker will be
> > Jean-Baptiste Onofré  himself!
> >
> > If you have an interesting *use-case* to share and are in London on the
> *5th of
> > December*, don't hesitate to reach out to me :)
> > Else: keep track of the meetup page
> >  to be updated on our
> > activity in the space.
> >
> > Best,
> > Matthias
> > --
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
--


Re: IBM Streams now supports Apache Beam Java applications

2017-11-08 Thread Shen Li
Hi

> Do you have IBM specific IOs?  Any plans to contribute this or other
parts into Apache, or to get more involved with the community?

The SDK contains IBM Cloud Object Storage (Swift API) IO, and IBM Streams
specific Pub/Sub. There is a newer version of IBM COS coming up using the
S3 API. We'd love to contribute an IO connector for it, maybe by wrapping
the existing HadoopFileSystem IO.

> what do you think about adding the runner in the Beam website capability
matrix ?

We will follow the contribution guide to add IBM Streams to Beam's
capability matrix.

Best,
Shen



On Wed, Nov 8, 2017 at 8:36 AM, Ismaël Mejía  wrote:

> Congratulations, this is a nice feature for the IBM Cloud and of
> course great news for the Apache Beam community.
>
> Do you have specific IBM specific IOs? I noticed you guys have an
> implementation of the OpenStack's Swift FileSystem as part of your
> SDK. Any plans to contribute this or other parts into Apache, or to
> get more involved with the community?
>
> Congratulations again, and thanks for bringing these good news.
>
> On Wed, Nov 8, 2017 at 5:44 AM, Jean-Baptiste Onofré 
> wrote:
> > That's awesome !
> >
> > I'm curious:
> > - can you provide some highlights in term of supported features
> (internal),
> > like automatic scaling, etc ? I saw the capability matrix on the streamsx
> > documentation.
> > - what do you think about adding the runner in the Beam website
> capability
> > matrix ? Just to give visibility and list the runner.
> >
> > Thanks anyway, it's great.
> >
> > Regards
> > JB
> >
> >
> > On 11/07/2017 11:45 PM, Daniel Debrunner wrote:
> >>
> >> We are excited to announce the release of IBM Streams Runner for Apache
> >> Beam.
> >>
> >> Beam users can submit Beam 2.0 Java applications to IBM Cloud
> >> Streaming Analytics Service (free trials are available,
> >> https://console.bluemix.net/catalog/services/streaming-analytics ).
> >>
> >> Besides the Beam API, this runner also exposes several IBM
> >> Cloud/Streams specific features.
> >>
> >> Find more details here:
> >>
> >> https://ibmstreams.github.io/streamsx.documentation/docs/
> beamrunner/beamrunner-1-intro
> >>
> >> If you have any questions or requirements regarding Streams Runner,
> >> feel free to post them on StreamsDev forum:
> >> https://developer.ibm.com/answers/smartspace/streamsdev/index.html.
> >>
> >> Best,
> >> IBM Streams Team
> >>
> >
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
>


Re: IBM Streams now supports Apache Beam Java applications

2017-11-08 Thread Kenneth Knowles
This is very cool!

I'm curious about the Cloud/Streams specific features - can you give a gist
of those?

Kenn

On Tue, Nov 7, 2017 at 2:45 PM, Daniel Debrunner  wrote:

> We are excited to announce the release of IBM Streams Runner for Apache
> Beam.
>
> Beam users can submit Beam 2.0 Java applications to IBM Cloud
> Streaming Analytics Service (free trials are available,
> https://console.bluemix.net/catalog/services/streaming-analytics ).
>
> Besides the Beam API, this runner also exposes several IBM
> Cloud/Streams specific features.
>
> Find more details here:
> https://ibmstreams.github.io/streamsx.documentation/docs/
> beamrunner/beamrunner-1-intro
>
> If you have any questions or requirements regarding Streams Runner,
> feel free to post them on StreamsDev forum:
> https://developer.ibm.com/answers/smartspace/streamsdev/index.html.
>
> Best,
> IBM Streams Team
>


Re: IBM Streams now supports Apache Beam Java applications

2017-11-08 Thread Daniel Debrunner
On Tue, Nov 7, 2017 at 4:20 PM, Eugene Kirpichov  wrote:
> Wow, this is very exciting, thank you for the announcement!

It's great to see excitement! :-)

> This obviously provokes curiosity, so a few questions:
> - What are some features unique to this runner, i.e. features that might
> make somebody who isn't using IBM Streams to consider using it just because
> of how good is the Beam experience on it? I.e. things like - ease of use /
> "no ops", performance, debugging/monitoring features, autotuning features,
> etc.

I think this is very much our 1.0 release of a IBM Streams runner with
a focus on expanding our set of programming capabilities to include
Beam pipelines.
Thus the focus has been on seeing what Beam pipelines look like on IBM
Streams and our cloud service (Streaming Analytics on IBM Cloud) as
well as will Beam suit the types of applications our customers
require.

We envisage future work to further leverage features of our platform.

For anyone interested in diving into Streams more here is the IBM
link. https://www.ibm.com/cloud/streaming-analytics

> - Is this runner a good fit for running large batch jobs, or is it focused
> just on streaming use cases?

Our focus has been streaming use cases.

> - Have you tried validating the runner against the NexMark benchmark suite,
> recently added to Beam? That's an excellent way to validate both correctness
> and performance.

I believe we are looking at that though I think it requires us to move
to support 2.2.

> - From a brief reading of the tutorial I didn't quite understand: is it
> possible to play with this runner on a local machine?

Ttwo answers:

a) You can execute the runner locally to submit pipelines to a
Streaming Analytics service or a distributed IBM Streams instance.
b) You can execute pipelines as Streams applications in standalone
mode (single process) but it requires a local Streams install, which
are freely available for non-production use ("Quick Start Edition").
This is available as an install or a VM.

> - For the support forum: why not StackOverflow? :) Or, do you plan to monitor 
> StackOverflow as well?

I guess it fits in with our existing Streams community. If we saw
questions on StackOverflow we would hopefully answer them. :-)

Dan.


Re: IBM Streams now supports Apache Beam Java applications

2017-11-08 Thread Ismaël Mejía
Congratulations, this is a nice feature for the IBM Cloud and of
course great news for the Apache Beam community.

Do you have specific IBM specific IOs? I noticed you guys have an
implementation of the OpenStack's Swift FileSystem as part of your
SDK. Any plans to contribute this or other parts into Apache, or to
get more involved with the community?

Congratulations again, and thanks for bringing these good news.

On Wed, Nov 8, 2017 at 5:44 AM, Jean-Baptiste Onofré  wrote:
> That's awesome !
>
> I'm curious:
> - can you provide some highlights in term of supported features (internal),
> like automatic scaling, etc ? I saw the capability matrix on the streamsx
> documentation.
> - what do you think about adding the runner in the Beam website capability
> matrix ? Just to give visibility and list the runner.
>
> Thanks anyway, it's great.
>
> Regards
> JB
>
>
> On 11/07/2017 11:45 PM, Daniel Debrunner wrote:
>>
>> We are excited to announce the release of IBM Streams Runner for Apache
>> Beam.
>>
>> Beam users can submit Beam 2.0 Java applications to IBM Cloud
>> Streaming Analytics Service (free trials are available,
>> https://console.bluemix.net/catalog/services/streaming-analytics ).
>>
>> Besides the Beam API, this runner also exposes several IBM
>> Cloud/Streams specific features.
>>
>> Find more details here:
>>
>> https://ibmstreams.github.io/streamsx.documentation/docs/beamrunner/beamrunner-1-intro
>>
>> If you have any questions or requirements regarding Streams Runner,
>> feel free to post them on StreamsDev forum:
>> https://developer.ibm.com/answers/smartspace/streamsdev/index.html.
>>
>> Best,
>> IBM Streams Team
>>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com