Re: Stateful processing with session window

2018-02-12 Thread Maurizio Sambati
Hi Kenneth,

What runner are you using? Are you trying this out in the DirectRunner? As
> far as I know, no runner supports stateful processing in session windows
> yet. It is probably a bug that your pipeline was accepted by the runner
> when it includes features that the runner cannot execute. It would be a
> real mistake to have missed this validation for the DirectRunner.
>

Oh, ok, got it. :-(
I was actually trying this on the DirectRunner and it didn't fire errors,
so I guess no validation is performed in that sense.

Support for stateful processing in merging windows is definitely important.
> There's only a Jira filed for Dataflow [1] as far as I can tell. I just
> cloned it for the DirectRunner [2] since that is how you would test your
> pipeline. If you want to follow the same feature for a different runner, I
> can route a new ticket to the right person.
>
> Kenn
>
> [1] https://issues.apache.org/jira/browse/BEAM-2507
> [2] https://issues.apache.org/jira/browse/BEAM-3686
>

Awesome, I'm glad there is interest in adding this feature, it really fits
our use case. (and for the moment we have only interests for the two target
runners mentioned by you)

Thanks,
Maurizio


Re: Stateful processing with session window

2018-02-12 Thread Maurizio Sambati
Hi Carlos,

What I think is happening here is that the third 'a' you see is actually on
> a different window of the other 3 a's. Stateful being per key and window
> means that it keeps state for each key-window pairs, therefore, if your
> 'a's counter is being restarted is probably because it is actually a
> different one, and as the key is the same then the only possibility is that
> the window is different.
>

Yeah, that was my initial guess too, that's why I have questioned if I have
understood the semantic of the session window itself. Fortunately, as
Kenneth pointed out, my understanding was correct but this window model is
not compatible with stateful processing yet.

Maurizio


Re: London Apache Beam meetup 2: 11/01

2018-02-12 Thread Matthias Baetens
As promised (but with some delay): the recording
 of our second Beam
meetup in London! Enjoy :-)

On Tue, Jan 9, 2018 at 12:30 PM, Carlos Alonso  wrote:

> Cool, thanks!!
>
>
> On Mon, Jan 8, 2018 at 1:38 PM Matthias Baetens <
> matthias.baet...@datatonic.com> wrote:
>
>> Yes, we put everything in place to record this time and hope to share the
>> recordings soon after the meetup. Stay tuned!
>>
>> On 8 Jan 2018 10:32, "Carlos Alonso"  wrote:
>>
>> Will it be recorded?
>>
>> On Fri, Jan 5, 2018 at 5:11 PM Matthias Baetens <
>> matthias.baet...@datatonic.com> wrote:
>>
>>> Hi all,
>>>
>>> Excited to announce the second Beam meet up located in the *Qubit
>>> offices  *next *Thursday 11/01.*
>>>
>>> We are very excited to have JB flying in to talk about IO and Splittable
>>> DoFns and Vadim Sobolevski to share on how FutureFlow uses Beam in a
>>> finance use case.
>>>
>>> More info and RSVP here . We are looking forward
>>> to welcome you all!
>>>
>>> Best regards,
>>> Matthias
>>>
>>
>>


-- 


*Matthias Baetens*


*datatonic | data power unleashed*

office +44 203 668 3680  |  mobile +44 74 918 20646

Level24 | 1 Canada Square | Canary Wharf | E14 5AB London


We've been announced

as
one of the top global Google Cloud Machine Learning partners.


Re: London Apache Beam meetup 2: 11/01

2018-02-12 Thread Carlos Alonso
Thanks Matthias!! Lovely!

On Mon, Feb 12, 2018 at 12:24 PM Matthias Baetens <
matthias.baet...@datatonic.com> wrote:

> As promised (but with some delay): the recording
>  of our second Beam
> meetup in London! Enjoy :-)
>
> On Tue, Jan 9, 2018 at 12:30 PM, Carlos Alonso 
> wrote:
>
>> Cool, thanks!!
>>
>>
>> On Mon, Jan 8, 2018 at 1:38 PM Matthias Baetens <
>> matthias.baet...@datatonic.com> wrote:
>>
>>> Yes, we put everything in place to record this time and hope to share
>>> the recordings soon after the meetup. Stay tuned!
>>>
>>> On 8 Jan 2018 10:32, "Carlos Alonso"  wrote:
>>>
>>> Will it be recorded?
>>>
>>> On Fri, Jan 5, 2018 at 5:11 PM Matthias Baetens <
>>> matthias.baet...@datatonic.com> wrote:
>>>
 Hi all,

 Excited to announce the second Beam meet up located in the *Qubit
 offices  *next *Thursday 11/01.*

 We are very excited to have JB flying in to talk about IO and
 Splittable DoFns and Vadim Sobolevski to share on how FutureFlow uses Beam
 in a finance use case.

 More info and RSVP here . We are looking
 forward to welcome you all!

 Best regards,
 Matthias

>>>
>>>
>
>
> --
>
>
> *Matthias Baetens*
>
>
> *datatonic | data power unleashed*
>
> office +44 203 668 3680 <+44%2020%203668%203680>  |  mobile +44 74 918
> 20646
>
> Level24 | 1 Canada Square | Canary Wharf | E14 5AB London
> 
>
>
> We've been announced
> 
>  as
> one of the top global Google Cloud Machine Learning partners.
>


ParDo requires its input to use KvCoder in order to use state and timers

2018-02-12 Thread Carlos Alonso
I was refactoring my solution a bit and tried to make my stateful transform
to work on simple case classes and I got this exception:
https://pastebin.com/x4xADmvL . I'd like to understand the rationale behind
this as I think carefully choosing the keys would be very important in
order for the work to be properly distributed.

Thanks!


Re: ParDo requires its input to use KvCoder in order to use state and timers

2018-02-12 Thread Kenneth Knowles
Hi Carlos,

You are quite correct that choosing the keys is important for work to be
evenly distributed. The reason you need to have a KvCoder is that state is
partitioned per key (to give natural & automatic parallelism) and window
(to allow reclaiming expired state so you can process unbounded data with
bounded storage, and also more parallelism). To a Beam runner, most data in
the pipeline is "just bytes" that it cannot interpret. KvCoder is a special
case where a runner knows the binary layout of encoded data so it can pull
out the keys in order to shuffle data of the same key to the same place, so
that is why it has to be a KvCoder.

Kenn

On Mon, Feb 12, 2018 at 5:52 AM, Carlos Alonso  wrote:

> I was refactoring my solution a bit and tried to make my stateful
> transform to work on simple case classes and I got this exception:
> https://pastebin.com/x4xADmvL . I'd like to understand the rationale
> behind this as I think carefully choosing the keys would be very important
> in order for the work to be properly distributed.
>
> Thanks!
>


Re: Stateful processing with session window

2018-02-12 Thread Kenneth Knowles
On Mon, Feb 12, 2018 at 1:09 AM, Maurizio Sambati 
wrote:

> Hi Carlos,
>
> What I think is happening here is that the third 'a' you see is actually
>> on a different window of the other 3 a's. Stateful being per key and window
>> means that it keeps state for each key-window pairs, therefore, if your
>> 'a's counter is being restarted is probably because it is actually a
>> different one, and as the key is the same then the only possibility is that
>> the window is different.
>>
>
> Yeah, that was my initial guess too, that's why I have questioned if I
> have understood the semantic of the session window itself. Fortunately, as
> Kenneth pointed out, my understanding was correct but this window model is
> not compatible with stateful processing yet.
>

I want to mention something here - we do know that it is compatible with
merging windows. In fact, triggers use the state mechanism in merging
windows "under the hood". The issue is connecting it to user-defined state.
Each runner is slightly different, though it is not terribly difficult for
any of them. For BagState, it will automatically combine the bags. For
CombiningState it will automaticaly use mergeAccumulators. For ValueState
it will probably not be supported for a while, and perhaps eventually will
have a merge callback.

Kenn


Re: ParDo requires its input to use KvCoder in order to use state and timers

2018-02-12 Thread Carlos Alonso
Ok, that makes a lot of sense. Thanks Kenneth!

On Mon, Feb 12, 2018 at 5:41 PM Kenneth Knowles  wrote:

> Hi Carlos,
>
> You are quite correct that choosing the keys is important for work to be
> evenly distributed. The reason you need to have a KvCoder is that state is
> partitioned per key (to give natural & automatic parallelism) and window
> (to allow reclaiming expired state so you can process unbounded data with
> bounded storage, and also more parallelism). To a Beam runner, most data in
> the pipeline is "just bytes" that it cannot interpret. KvCoder is a special
> case where a runner knows the binary layout of encoded data so it can pull
> out the keys in order to shuffle data of the same key to the same place, so
> that is why it has to be a KvCoder.
>
> Kenn
>
> On Mon, Feb 12, 2018 at 5:52 AM, Carlos Alonso 
> wrote:
>
>> I was refactoring my solution a bit and tried to make my stateful
>> transform to work on simple case classes and I got this exception:
>> https://pastebin.com/x4xADmvL . I'd like to understand the rationale
>> behind this as I think carefully choosing the keys would be very important
>> in order for the work to be properly distributed.
>>
>> Thanks!
>>
>
>


Re: Handling errors in IOs

2018-02-12 Thread Motty Gruda
Hi,

I managed to set the automatic reconnect through the ConnectionFactory, I
didn't know it was possible. Thanks!

What do you mean by using "split"? Now when running on the spark runner, if
one of the brokers becomes unavailable the entire pipeline is stuck on the
following job:

org.apache.spark.streaming.dstream.DStream.(DStream.scala:115)
org.apache.beam.runners.spark.io.SparkUnboundedSource$ReadReportDStream.(SparkUnboundedSource.java:171)
org.apache.beam.runners.spark.io.SparkUnboundedSource.read(SparkUnboundedSource.java:113)
org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$2.evaluate(StreamingTransformTranslator.java:125)
org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$2.evaluate(StreamingTransformTranslator.java:119)
org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:413)
org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:399)
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:663)
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:655)
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:655)
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:446)
org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call(SparkRunnerStreamingContextFactory.java:88)
org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call(SparkRunnerStreamingContextFactory.java:47)
org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$10.apply(JavaStreamingContext.scala:776)
org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$10.apply(JavaStreamingContext.scala:775)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:864)
org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:775)



The updated code:
Pipeline p =
Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

ConnectionFactory factory_a = new
ActiveMQConnectionFactory("failover://(tcp://activemq-a:61616)?initialReconnectDelay=2000&maxReconnectAttempts=-1");
ConnectionFactory factory_b = new
ActiveMQConnectionFactory("failover://(tcp://activemq-b:61616)?initialReconnectDelay=2000&maxReconnectAttempts=-1");
ConnectionFactory factory_c = new
ActiveMQConnectionFactory("tcp://activemq-c:61616");

PCollection a = p.apply("ReadFromQueue",
JmsIO.read().withConnectionFactory(factory_a).withUsername("admin").withPassword("admin")
.withQueue("a"));
PCollection b = p.apply("ReadFromQueue2",
JmsIO.read().withConnectionFactory(factory_b).withUsername("admin").withPassword("admin")
.withQueue("b"));

PCollection combined =
PCollectionList.of(a).and(b).apply(Flatten.pCollections());

combined.apply(MapElements.into(TypeDescriptors.strings()).via((JmsRecord
r) -> r.getPayload()))
.apply("WriteToQueue",
JmsIO.write().withConnectionFactory(factory_c).withUsername("admin")
.withPassword("admin").withQueue("c"));

p.run().waitUntilFinish();

*Trying to run the code above on spark 2.2.0 and beam 2.3.0-rc3, most of
the messages simply disappeared inside the system!!!*
*The messages were read from queues "a" and "b" but most of them didn't
arrive at queue "c".*



On Mon, Feb 12, 2018 at 5:30 AM, Jean-Baptiste Onofré 
wrote:

> Hi,
>
> here you don't use split, but different JmsIO reading from different
> queues (not the same). The two reads are not related.
>
> If you kill connection from one, you have to reconnect. That can be done
> by configuration on the ConnectionFactory.
>
> Is it what you want ? Automatically reconnect ?
>
> Regards
> JB
>
>
> On 11/02/2018 12:58, Motty Gruda wrote:
>
>> runner: spark runner (1.6.3)
>> beam: 2.2.0
>> activemq: 5.14.3
>>
>> code:
>>
>> Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).
>> withValidation().create());
>>
>> ConnectionFactory factory_a = new ActiveMQConnectionFactory("tcp
>> ://activemq-a:61616");
>> ConnectionFactory factory_b = new ActiveMQConnectionFactory("tcp
>> ://activemq-b:61616");
>> ConnectionFactory factory_c = new ActiveMQConnectionFactory("tcp
>> ://activemq-c:61616");
>>
>> PCollection a = p.apply("ReadFromQueue",
>> JmsIO.read().withConnectionFactory(factory_a).withUsername("
>> admin").withPassword("admin")
>> .withQueue("a"));
>> PCollection b = p.apply("ReadFromQueue2",
>> JmsIO.read().withConnectionFactory(factory_b).withUsername("
>> admin").withPassword("admin")
>> .withQueue("b"));
>>
>> PCollection combined = PCollectionList.of(a).and(b).a
>> pply(Flatten.pCollections());
>>
>> combined.apply(MapElements.into(TypeDescriptors.strings()).via((JmsRecord
>> r) ->

Re: Triggers based on size

2018-02-12 Thread Carlos Alonso
I've finally managed to understand, write and run my job using stateful and
timely processing. Here:
https://gist.github.com/calonso/14b06f4f58faf286cc79181cb46a9b85 you can
see the code should someone need inspiration.

Thanks a lot for your help, for encouraging me going that way, for such a
great product and the amazing community you're building around it.

On Wed, Jan 10, 2018 at 6:11 PM Robert Bradshaw  wrote:

> Sounds like you have enough to get started. Feel free to come back
> here with more specifics if you can't get it working.
>
> On Wed, Jan 10, 2018 at 9:09 AM, Carlos Alonso 
> wrote:
> > Thanks Robert!!
> >
> > After reading this and the former post about stateful processing
> Kenneth's
> > suggestions sounds sensible. I'll probably give them a try!! Is there
> > anything you would like to advice me before starting?
> >
> > Thanks!
> >
> > On Wed, Jan 10, 2018 at 10:13 AM Robert Bradshaw 
> > wrote:
> >>
> >> Unfortunately, the metadata driven trigger is still just an idea, not
> >> yet implemented.
> >>
> >> A good introduction to state and timers can be found at
> >> https://beam.apache.org/blog/2017/08/28/timely-processing.html
> >>
> >> On Wed, Jan 10, 2018 at 1:08 AM, Carlos Alonso 
> >> wrote:
> >> > Hi Robert, Kenneth.
> >> >
> >> > Thanks a lot to both of you for your responses!!
> >> >
> >> > Kenneth, unfortunately I'm not sure we're experienced enough with
> Apache
> >> > Beam to get anywhere close to your suggestion, but thanks anyway!!
> >> >
> >> > Robert, your suggestion sounds great to me, could you please provide
> any
> >> > example on how to use that 'metadata driven' trigger?
> >> >
> >> > Thanks!
> >> >
> >> > On Tue, Jan 9, 2018 at 9:11 PM Kenneth Knowles 
> wrote:
> >> >>
> >> >> Often, when you need or want more control than triggers provide, such
> >> >> as
> >> >> input-type-specific logic like yours, you can use state and timers in
> >> >> ParDo
> >> >> to control when to output. You lose any potential optimizations of
> >> >> Combine
> >> >> based on associativity/commutativity and assume the burden of making
> >> >> sure
> >> >> your output is sensible, but dropping to low-level stateful
> computation
> >> >> may
> >> >> be your best bet.
> >> >>
> >> >> Kenn
> >> >>
> >> >> On Tue, Jan 9, 2018 at 11:59 AM, Robert Bradshaw <
> rober...@google.com>
> >> >> wrote:
> >> >>>
> >> >>> We've tossed around the idea of "metadata-driven" triggers which
> would
> >> >>> essentially let you provide a mapping element -> metadata and a
> >> >>> monotonic CombineFn metadata* -> bool that would allow for this (the
> >> >>> AfterCount being a special case of this, with the mapping fn being _
> >> >>> -> 1, and the CombineFn being sum(...) >= N, for size one would
> >> >>> provide a (perhaps approximate) sizing mapping fn).
> >> >>>
> >> >>> Note, however, that there's no guarantee that the trigger fire as
> soon
> >> >>> as possible; due to runtime characteristics a significant amount of
> >> >>> data may be buffered (or come in at once) before the trigger is
> >> >>> queried. One possibility would be to follow your triggering with a
> >> >>> DoFn that breaks up large value streams into multiple manageable
> sized
> >> >>> ones as needed.
> >> >>>
> >> >>> On Tue, Jan 9, 2018 at 11:43 AM, Carlos Alonso <
> car...@mrcalonso.com>
> >> >>> wrote:
> >> >>> > Hi everyone!!
> >> >>> >
> >> >>> > I was wondering if there is an option to trigger window panes
> based
> >> >>> > on
> >> >>> > the
> >> >>> > size of the pane itself (rather than the number of elements).
> >> >>> >
> >> >>> > To provide a little bit more of context we're backing up a PubSub
> >> >>> > topic
> >> >>> > into
> >> >>> > GCS with the "special" feature that, depending on the "type" of
> the
> >> >>> > message,
> >> >>> > the GCS destination is one or another.
> >> >>> >
> >> >>> > Messages' 'shape' published there is quite random, some of them
> are
> >> >>> > very
> >> >>> > frequent and small, some others very big but sparse... We have
> >> >>> > around
> >> >>> > 150
> >> >>> > messages per second (in total) and we're firing every 15 minutes
> and
> >> >>> > experiencing OOM errors, we've considered firing based on the
> number
> >> >>> > of
> >> >>> > items as well, but given the randomness of the input, I don't
> think
> >> >>> > it
> >> >>> > will
> >> >>> > be a final solution either.
> >> >>> >
> >> >>> > Having a trigger based on size would be great, another option
> would
> >> >>> > be
> >> >>> > to
> >> >>> > have a dynamic shards number for the PTransform that actually
> writes
> >> >>> > the
> >> >>> > files.
> >> >>> >
> >> >>> > What is your recommendation for this use case?
> >> >>> >
> >> >>> > Thanks!!
> >> >>
> >> >>
> >> >
>


Re: Handling errors in IOs

2018-02-12 Thread Jean-Baptiste Onofré
Hi Motty,

yes, you can configure reconnect, timeout, etc on the ConnectionFactory
(especially when you use ActiveMqPooledConnectionFactory).

For the split, I didn't mean regarding the Spark cluster but more in term of
workers.
When you use something like:

pipeline.apply(JmsIO.read().fromQueue("foo"))

the runner can send a desired num of splits
(https://github.com/apache/beam/blob/master/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java#L407).
Then the IO will create a consumer per split to the queue.

It scales in term of speed of consuming as we have concurrent consumers on the
queue.

Regards
JB

On 02/12/2018 07:32 PM, Motty Gruda wrote:
> Hi,
> 
> I managed to set the automatic reconnect through the ConnectionFactory, I 
> didn't
> know it was possible. Thanks!
> 
> What do you mean by using "split"? Now when running on the spark runner, if 
> one
> of the brokers becomes unavailable the entire pipeline is stuck on the 
> following
> job:
> 
> org.apache.spark.streaming.dstream.DStream.(DStream.scala:115)
> org.apache.beam.runners.spark.io.SparkUnboundedSource$ReadReportDStream.(SparkUnboundedSource.java:171)
> org.apache.beam.runners.spark.io.SparkUnboundedSource.read(SparkUnboundedSource.java:113)
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$2.evaluate(StreamingTransformTranslator.java:125)
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$2.evaluate(StreamingTransformTranslator.java:119)
> org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:413)
> org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:399)
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:663)
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:655)
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:655)
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:446)
> org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call(SparkRunnerStreamingContextFactory.java:88)
> org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call(SparkRunnerStreamingContextFactory.java:47)
> org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$10.apply(JavaStreamingContext.scala:776)
> org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$10.apply(JavaStreamingContext.scala:775)
> scala.Option.getOrElse(Option.scala:120)
> org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:864)
> org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:775)
> 
> 
> 
> The updated code:
> Pipeline p =
> Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
> 
> ConnectionFactory factory_a = new
> ActiveMQConnectionFactory("failover://(tcp://activemq-a:61616)?initialReconnectDelay=2000&maxReconnectAttempts=-1");
> ConnectionFactory factory_b = new
> ActiveMQConnectionFactory("failover://(tcp://activemq-b:61616)?initialReconnectDelay=2000&maxReconnectAttempts=-1");
> ConnectionFactory factory_c = new
> ActiveMQConnectionFactory("tcp://activemq-c:61616");
> 
> PCollection a = p.apply("ReadFromQueue",
> JmsIO.read().withConnectionFactory(factory_a).withUsername("admin").withPassword("admin")
> .withQueue("a"));
> PCollection b = p.apply("ReadFromQueue2",
> JmsIO.read().withConnectionFactory(factory_b).withUsername("admin").withPassword("admin")
> .withQueue("b"));
> 
> PCollection combined =
> PCollectionList.of(a).and(b).apply(Flatten.pCollections());
> 
> combined.apply(MapElements.into(TypeDescriptors.strings()).via((JmsRecord r) 
> ->
> r.getPayload()))
> .apply("WriteToQueue",
> JmsIO.write().withConnectionFactory(factory_c).withUsername("admin")
> .withPassword("admin").withQueue("c"));
> 
> p.run().waitUntilFinish();
> 
> *Trying to run the code above on spark 2.2.0 and beam 2.3.0-rc3, most of the
> messages simply disappeared inside the system!!!*
> *The messages were read from queues "a" and "b" but most of them didn't arrive
> at queue "c".*
> *
> *
> *
> *
> 
> On Mon, Feb 12, 2018 at 5:30 AM, Jean-Baptiste Onofré  > wrote:
> 
> Hi,
> 
> here you don't use split, but different JmsIO reading from different 
> queues
> (not the same). The two reads are not related.
> 
> If you kill connection from one, you have to reconnect. That can be done 
> by
> configuration on the ConnectionFactory.
> 
> Is it what you want ? Automatically reconnect ?
> 
> Regards
> JB
> 
> 
> On 11/02/2018 12:58, Motty Gruda wrote:
> 
> runner: spark runner (1.6.3)
> beam: 2.2.0
> 

working with hot keys

2018-02-12 Thread Jacob Marble
When joining (Join.leftOuterJoin etc) a PCollection to
PCollection, and K:V1 contains hot keys, my pipeline gets very slow.
It can bring processing time from hours to days.

Reading this blog post

I
can see some thought has already been given to this problem:
"To address this, we allow you to provide extra parallelism hints using the
Combine.PerKey.withHotKeyFanout or Combine.Globally.withFanout. These
operations will create an extra step in your pipeline to pre-aggregate the
data on many machines before performing the final aggregation on the target
machines."

(1 of 2)

These two solutions, Combine.PerKey.withHotKeyFanout or
Combine.Globally.withFanout, do not help with a join (CoGBK) operation,
however. So, I solved my problem with these stages before and after the
join operation, effectively joining K:Iterable with K:V2:

kvIterable1 = kv1.apply("GBK to mitigate hot keys", GroupByKey.create())

Join.someJoin(kvIterable1, kv2)
.apply(Values.create())
.apply("undo hot key GBK",
ParDo
.of(new DoFn, V2>, KV>() {
  @ProcessElement
  public void fanout(ProcessContext context) {
for (V1 v1 : context.element().getKey()) {
  context.output(KV.of(v1,
context.element().getValue()));
}
  }
}))

Does that look sane to people who have been working with Beam for a long
time? It has worked well for us over the last two months or so.

(2 of 2)

Lately, the size of the value has grown too large. It took some effort to
figure out the problem, which manifested as an
ArrayIndexOutOfBoundsException emitted from RandomAccessData.write().
Here's the follow-up solution, only changing the first half of the above
solution:

kvIterable1 = kv1
.apply("GBK to mitigate hot keys", GroupByKey.create())
.apply("partition grouped values",
ParDo
.of(new DoFn>, KV>>() {
  @ProcessElement
  public void partition(ProcessContext context) {
K k = context.element().getKey();
Iterable v1Iterable = context.element().getValue();
for (List partition :
Iterables.partition(v1Iterable, 100)) {
  context.output(KV.>of(k, partition));
}
  }
}));

Again, is this sane? Initial testing suggests this is a good solution.

Jacob


Re: working with hot keys

2018-02-12 Thread Lukasz Cwik
The optimization that you have done is that you have forced the V1 iterable
to reside in memory completely since it is now counted as a single element.
This will fall apart as soon your V1 iterable exceeds memory.
Runners like Dataflow allow re-iteration of a GBK/CoGBK result allowing for
the GBK/CoGBK result to exceed the size of memory and this currently only
functions at the first level within the value iterable, meaning that the
entire Iterable is treated as a single value in your Join.someJoin. You
should see similar performance if you take all the V1s out of the CoGBK and
"copy" it into an arraylist inside your DoFn and then walk the V2 iterable
and the in memory array list performing the outer join. It will also likely
be easier to reason about. Note that Dataflow doesn't do this in a great
way and causes the re-iteration to happen many more times then it should
need to which is why your perf numbers are ballooning.

Alternatively, have you tried putting either of the PCollection into a multimap side input and then just doing a GBK on the other
PCollection followed by a DoFn that joins the two together
with the multimap side input?
The choice of whether V1 or V2 works better in the side input depends on
the sizes of the relative PCollections and whether the working set of the
PCollection can be cached in memory (good for side input) or the GBK
PCollection is sparse enough that if everything is cache miss it won't
matter.


On Mon, Feb 12, 2018 at 1:53 PM, Jacob Marble  wrote:

> When joining (Join.leftOuterJoin etc) a PCollection to
> PCollection, and K:V1 contains hot keys, my pipeline gets very slow.
> It can bring processing time from hours to days.
>
> Reading this blog post
> 
>  I
> can see some thought has already been given to this problem:
> "To address this, we allow you to provide extra parallelism hints using
> the Combine.PerKey.withHotKeyFanout or Combine.Globally.withFanout. These
> operations will create an extra step in your pipeline to pre-aggregate the
> data on many machines before performing the final aggregation on the target
> machines."
>
> (1 of 2)
>
> These two solutions, Combine.PerKey.withHotKeyFanout or
> Combine.Globally.withFanout, do not help with a join (CoGBK) operation,
> however. So, I solved my problem with these stages before and after the
> join operation, effectively joining K:Iterable with K:V2:
>
> kvIterable1 = kv1.apply("GBK to mitigate hot keys", GroupByKey. V1>create())
>
> Join.someJoin(kvIterable1, kv2)
> .apply(Values.create())
> .apply("undo hot key GBK",
> ParDo
> .of(new DoFn, V2>, KV>() {
>   @ProcessElement
>   public void fanout(ProcessContext context) {
> for (V1 v1 : context.element().getKey()) {
>   context.output(KV.of(v1,
> context.element().getValue()));
> }
>   }
> }))
>
> Does that look sane to people who have been working with Beam for a long
> time? It has worked well for us over the last two months or so.
>
> (2 of 2)
>
> Lately, the size of the value has grown too large. It took some effort to
> figure out the problem, which manifested as an
> ArrayIndexOutOfBoundsException emitted from RandomAccessData.write().
> Here's the follow-up solution, only changing the first half of the above
> solution:
>
> kvIterable1 = kv1
> .apply("GBK to mitigate hot keys", GroupByKey.create())
> .apply("partition grouped values",
> ParDo
> .of(new DoFn>, KV>>() {
>   @ProcessElement
>   public void partition(ProcessContext context) {
> K k = context.element().getKey();
> Iterable v1Iterable = context.element().getValue();
> for (List partition :
> Iterables.partition(v1Iterable, 100)) {
>   context.output(KV.>of(k, partition));
> }
>   }
> }));
>
> Again, is this sane? Initial testing suggests this is a good solution.
>
> Jacob
>


There are missing classes in org.apache.beam.sdk.values package

2018-02-12 Thread Mungeol Heo
Hello,

I am using the maven dependency addressed below.


org.apache.beam
beam-sdks-java-core
2.2.0


However, I found that there are missing classes in
org.apache.beam.sdk.values package.
Such as Row and RowType which I can be found at the github placed below.

https://github.com/apache/beam/tree/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values

Any help will be very great.

Thank you.


Re: There are missing classes in org.apache.beam.sdk.values package

2018-02-12 Thread Pawel Bartoszek
Hi,

The classes you are referring to weren't yet available in Beam 2.2.0.
Please check release-2.2.0 branch
https://github.com/apache/beam/tree/release-2.2.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values

Cheers,
Pawel


On 13 February 2018 at 07:15, Mungeol Heo  wrote:

> Hello,
>
> I am using the maven dependency addressed below.
>
> 
> org.apache.beam
> beam-sdks-java-core
> 2.2.0
> 
>
> However, I found that there are missing classes in
> org.apache.beam.sdk.values package.
> Such as Row and RowType which I can be found at the github placed below.
>
> https://github.com/apache/beam/tree/master/sdks/java/
> core/src/main/java/org/apache/beam/sdk/values
>
> Any help will be very great.
>
> Thank you.
>