Re: Indexing a PCollection

2019-02-20 Thread Daniel Erenrich
Sure, I'm considering implementing distributed matrix factorization for
collaborative filtering in beam as a project to teach myself the framework.
One useful preprocessing step is to map the users and items to rows and
columns of the ratings matrix. To make the matrix as dense as possible it
is useful (though not strictly required) for those row and column numbers
to be sequential integers.

All that said, I'm very new to the framework and I'm not 100% sure that you
can even naively express this algorithm in beam. Still the primitive I
described seems generically useful when you want to build dense
matrices/vectors.

Daniel

On Wed, Feb 20, 2019 at 9:49 PM Kenneth Knowles  wrote:

> Can you share more about the end-to-end use case?
>
> Kenn
>
> On Wed, Feb 20, 2019 at 7:38 PM Daniel Erenrich 
> wrote:
>
>> My workflow requires me to assign arbitrary unique sequential integers to
>> every distinct element in a PCollection. In spark I would do this with
>> .zipWithIndex but it seems that isn't possible in beam (there was a
>> previous conversation about this on this mailing list). I was able to get
>> something to work in beam/scio by doing the following
>>
>> val wordsSide = distinctWords.asListSideInput
>> val wordsWithWords = distinctWords.withSideInputs(wordsSide)
>> val indexedWords = wordsWithWords.keyBy((x, s) =>
>> s(wordsSide).indexOf(x))
>>
>> It works for my tests but my inexperience with beam makes me worried this
>> will not work in practice. Things I'm worried about: what if
>> distinctWords is an unbounded stream? will that invalidate all of my
>> previous indices? how will this interact with windowing? Is there a more
>> efficient way to do this?
>>
>> (And yes I know indexOf will be very slow and I should be using a better
>> data-structure)
>>
>> Also, if I do get this to work would this be something a PR would be
>> accepted for to add this functionality directly into beam/scio? I often
>> need something like this.
>>
>> Thanks for any help,
>> Daniel
>>
>>


Re: Indexing a PCollection

2019-02-20 Thread Kenneth Knowles
Can you share more about the end-to-end use case?

Kenn

On Wed, Feb 20, 2019 at 7:38 PM Daniel Erenrich  wrote:

> My workflow requires me to assign arbitrary unique sequential integers to
> every distinct element in a PCollection. In spark I would do this with
> .zipWithIndex but it seems that isn't possible in beam (there was a
> previous conversation about this on this mailing list). I was able to get
> something to work in beam/scio by doing the following
>
> val wordsSide = distinctWords.asListSideInput
> val wordsWithWords = distinctWords.withSideInputs(wordsSide)
> val indexedWords = wordsWithWords.keyBy((x, s) =>
> s(wordsSide).indexOf(x))
>
> It works for my tests but my inexperience with beam makes me worried this
> will not work in practice. Things I'm worried about: what if distinctWords is
> an unbounded stream? will that invalidate all of my previous indices? how
> will this interact with windowing? Is there a more efficient way to do this?
>
> (And yes I know indexOf will be very slow and I should be using a better
> data-structure)
>
> Also, if I do get this to work would this be something a PR would be
> accepted for to add this functionality directly into beam/scio? I often
> need something like this.
>
> Thanks for any help,
> Daniel
>
>


Indexing a PCollection

2019-02-20 Thread Daniel Erenrich
My workflow requires me to assign arbitrary unique sequential integers to
every distinct element in a PCollection. In spark I would do this with
.zipWithIndex but it seems that isn't possible in beam (there was a
previous conversation about this on this mailing list). I was able to get
something to work in beam/scio by doing the following

val wordsSide = distinctWords.asListSideInput
val wordsWithWords = distinctWords.withSideInputs(wordsSide)
val indexedWords = wordsWithWords.keyBy((x, s) =>
s(wordsSide).indexOf(x))

It works for my tests but my inexperience with beam makes me worried this
will not work in practice. Things I'm worried about: what if distinctWords is
an unbounded stream? will that invalidate all of my previous indices? how
will this interact with windowing? Is there a more efficient way to do this?

(And yes I know indexOf will be very slow and I should be using a better
data-structure)

Also, if I do get this to work would this be something a PR would be
accepted for to add this functionality directly into beam/scio? I often
need something like this.

Thanks for any help,
Daniel


Re: FileIO with GCP - KMS support

2019-02-20 Thread Juan Carlos Garcia
Hi Udi,

Thanks for that, looking forward for the 2.11 release.

JC

Udi Meiri  schrieb am Mi., 20. Feb. 2019, 22:26:

> Hi Juan!
>
> I've recently replaced GCS copy with rewrite:
> https://github.com/apache/beam/pull/7682
> It should be available in the next Beam release (2.11).
>
> On Wed, Feb 20, 2019 at 7:43 AM Juan Carlos Garcia 
> wrote:
>
>> Sorry, i hit send before verifying the right name of the method:
>>
>> The correct method name is: *enqueueCopy*
>>
>> On Wed, Feb 20, 2019 at 4:39 PM Juan Carlos Garcia 
>> wrote:
>>
>>> For anyone interested on the same while waiting for KMS support, just
>>> place the class on your own project
>>> *org.apache.beam.sdk.util.GcsUtil *
>>>
>>> Look / modify the *enqueCopy *method and replace the 
>>> *storageClient.objects().copy()
>>> *with a *storageClient.objects().rewrite() , add the corresponding
>>> callback *and it should works as expected.
>>>
>>> Cheers!
>>>
>>>
>>> On Wed, Feb 20, 2019 at 11:11 AM Juan Carlos Garcia 
>>> wrote:
>>>
 Hi Folks,

 Is there any discussion going on regarding the support to writes to GCP
 bucket protected with KMS ?

 Thanks and regards,
 --

 JC


>>>
>>> --
>>>
>>> JC
>>>
>>>
>>
>> --
>>
>> JC
>>
>>


Re: FileIO with GCP - KMS support

2019-02-20 Thread Udi Meiri
Hi Juan!

I've recently replaced GCS copy with rewrite:
https://github.com/apache/beam/pull/7682
It should be available in the next Beam release (2.11).

On Wed, Feb 20, 2019 at 7:43 AM Juan Carlos Garcia 
wrote:

> Sorry, i hit send before verifying the right name of the method:
>
> The correct method name is: *enqueueCopy*
>
> On Wed, Feb 20, 2019 at 4:39 PM Juan Carlos Garcia 
> wrote:
>
>> For anyone interested on the same while waiting for KMS support, just
>> place the class on your own project
>> *org.apache.beam.sdk.util.GcsUtil *
>>
>> Look / modify the *enqueCopy *method and replace the 
>> *storageClient.objects().copy()
>> *with a *storageClient.objects().rewrite() , add the corresponding
>> callback *and it should works as expected.
>>
>> Cheers!
>>
>>
>> On Wed, Feb 20, 2019 at 11:11 AM Juan Carlos Garcia 
>> wrote:
>>
>>> Hi Folks,
>>>
>>> Is there any discussion going on regarding the support to writes to GCP
>>> bucket protected with KMS ?
>>>
>>> Thanks and regards,
>>> --
>>>
>>> JC
>>>
>>>
>>
>> --
>>
>> JC
>>
>>
>
> --
>
> JC
>
>


smime.p7s
Description: S/MIME Cryptographic Signature


Re: Some questions about ensuring correctness with windowing and triggering

2019-02-20 Thread Raghu Angadi
On Tue, Feb 12, 2019 at 10:28 AM Robert Bradshaw 
wrote:

> Correct, even within the same key there's no promise of event time
> ordering mapping of panes to real time ordering because the downstream
> operations *may* happen on a different machine. Multiply triggered
> windows add an element of non-determinism to the process.
>

For clarification, the stage immediately after GBK itself processes fired
panes in order, correct? Of course, any more stages downstream of that may
see them out of order.

Raghu.

>
> You're also correct that triggering with multiple panes requires lots of
> care, especially when it comes to operations with side effects (like
> sinks). Most safe is to only write the final pane to the sink, and handle
> early triggering in a different way.
> https://s.apache.org/beam-sink-triggers is a proposal to make this easier
> to reason about.
>
>
> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz  wrote:
>
>> Also to clarify here (I re-read this and realized it could be slightly
>> unclear).  My question is only about in-order delivery of panes.  ie: will
>> pane P always be delivered before P+1.
>>
>> I realize the use of "in-order" before could be confusing, I don't care
>> about the ordering of the elements per-se, just the ordering of the pane
>> delivery.
>>
>> I want to make sure that given a GBK that produces 3 panes (P0, P1, P2)
>> for a key, a downstream PCollection could never see P0, P2, P1.  OR at
>> least, the final firing is always guaranteed to be delivered after all
>> early-firings (eg we could have P0, P2, P1, but then always PLast).
>>
>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz 
>> wrote:
>>
>>> Are you also saying also that even in the first example (Source ->
>>> CombineByKey (Sum) -> Sink) there's no guarantee that events would be
>>> delivered in-order from the Combine -> Sink transforms?  This seems like a
>>> pretty big "got-cha" for correctness if you ever use accumulating
>>> triggering.
>>>
>>> I'd also like to point out I'm not talking about a global ordering
>>> across the entire PCollection, I'm talking about within the same key after
>>> a GBK transform.
>>>
>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw 
>>> wrote:
>>>
 Due to the nature of distributed processing, order is not preserved.
 You can, however, inspect the PaneInfo to determine if an element was
 early, on-time, or late and act accordingly.

 On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia 
 wrote:

> In my experience ordering is not guaranteed, you may need apply a
> transformation that sort the elements and then dispatch them sorted out.
>
> Or uses the Sorter extension for this:
>
> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>
> Steve Niemitz  schrieb am Di., 12. Feb. 2019,
> 16:31:
>
>> Hi everyone, I have some questions I want to ask about how windowing,
>> triggering, and panes work together, and how to ensure correctness
>> throughout a pipeline.
>>
>> Lets assume I have a very simple streaming pipeline that looks like:
>> Source -> CombineByKey (Sum) -> Sink
>>
>> Given fixed windows of 1 hour, early firings every minute, and
>> accumulating panes, this is pretty straight forward.  However, this can 
>> get
>> more complicated if we add steps after the CombineByKey, for instance
>> (using the same windowing strategy):
>>
>> Say I want to buffer the results of the CombineByKey into batches of
>> N elements.  I can do this with the built-in GroupIntoBatches [1]
>> transform, now my pipeline looks like:
>>
>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>>
>> *This leads to my main question:*
>> Is ordering preserved somehow here?  ie: is it possible that the
>> result from early firing F+1 now comes BEFORE the firing F (because it 
>> was
>> re-ordered in the GroupIntoBatches).  This would mean that the sink then
>> gets F+1 before F, which means my resulting store has incorrect data
>> (possibly forever if F+1 was the final firing).
>>
>> If ordering is not preserved, it seems as if I'd need to introduce my
>> own ordering back in after GroupIntoBatches.  GIB is an example here, 
>> but I
>> imagine this could happen with any GBK type operation.
>>
>> Am I thinking about this the correct way?  Thanks!
>>
>> [1]
>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>>
>


Re: FileIO with GCP - KMS support

2019-02-20 Thread Juan Carlos Garcia
Sorry, i hit send before verifying the right name of the method:

The correct method name is: *enqueueCopy*

On Wed, Feb 20, 2019 at 4:39 PM Juan Carlos Garcia 
wrote:

> For anyone interested on the same while waiting for KMS support, just
> place the class on your own project
> *org.apache.beam.sdk.util.GcsUtil *
>
> Look / modify the *enqueCopy *method and replace the 
> *storageClient.objects().copy()
> *with a *storageClient.objects().rewrite() , add the corresponding
> callback *and it should works as expected.
>
> Cheers!
>
>
> On Wed, Feb 20, 2019 at 11:11 AM Juan Carlos Garcia 
> wrote:
>
>> Hi Folks,
>>
>> Is there any discussion going on regarding the support to writes to GCP
>> bucket protected with KMS ?
>>
>> Thanks and regards,
>> --
>>
>> JC
>>
>>
>
> --
>
> JC
>
>

-- 

JC


Re: FileIO with GCP - KMS support

2019-02-20 Thread Juan Carlos Garcia
For anyone interested on the same while waiting for KMS support, just place
the class on your own project
*org.apache.beam.sdk.util.GcsUtil *

Look / modify the *enqueCopy *method and replace the
*storageClient.objects().copy()
*with a *storageClient.objects().rewrite() , add the corresponding
callback *and it should works as expected.

Cheers!


On Wed, Feb 20, 2019 at 11:11 AM Juan Carlos Garcia 
wrote:

> Hi Folks,
>
> Is there any discussion going on regarding the support to writes to GCP
> bucket protected with KMS ?
>
> Thanks and regards,
> --
>
> JC
>
>

-- 

JC


FileIO with GCP - KMS support

2019-02-20 Thread Juan Carlos Garcia
Hi Folks,

Is there any discussion going on regarding the support to writes to GCP
bucket protected with KMS ?

Thanks and regards,
-- 

JC


Re: Some questions about ensuring correctness with windowing and triggering

2019-02-20 Thread Robert Bradshaw
I also think that we should improve our documentation. Multiply-fired
triggers is an advanced and dangerous feature, and care needs to be
taken that the downstream portions of the pipeline handle them
correctly.

I filed https://issues.apache.org/jira/browse/BEAM-6716

On Wed, Feb 20, 2019 at 5:40 AM Kenneth Knowles  wrote:
>
> This is a very valid concern so I want to offer advice you can apply today.
>
> "We actually do track tentative vs final values already, but checking that at 
> write-time would impose a pretty big overhead in the write path."
>
> You mention you are writing to BigTable. If your key is conceptually the 
> tuple of (isFinal, actual key) then you can avoid read-modify-write, and at 
> query time (of your BigTable) you can choose to only look at final values. 
> There are probably variations of this so you can get "on-time" as another 
> category to fetch.
>
> One reason we have not taken on the large-scale re-architecting is that if 
> you control the whole pipeline it is usually possible to meet your business 
> need with some combination of accumulation mode, existing sinks, and behavior 
> of your downstream client.
>
> I also wanted to respond to this bit:
>
> >> I was reading this yesterday, but couldn't see how it solved the 
> >> out-of-order delivery problem here...
>
> > It moves the responsibility of doing things in the right order (and even 
> > defining what order is "correct enough") to the runner (and sinks) such 
> > that the side effects happen in order...
>
> Again, emphasis on sinks. Many of the ideas there just apply to sinks in 
> general. For example, the sink that writes deltas and the sink that writes 
> whole elements are _different sinks_. The sink that treats retractions as 
> "delete" side effects and the sink that write a row representing the 
> retraction are _different sinks_. These are steps we could take in Beam today.
>
> Kenn
>
>
> On Tue, Feb 19, 2019 at 6:55 AM Steve Niemitz  wrote:
>>
>> Thanks again for all the replies everyone.  Just as a final follow up here, 
>> are there any concrete plans on addressing these issues I could start 
>> following?  The sink trigger doc seems like a start, but also seems like 
>> just a starting point in a larger re-architecture of sinks.
>>
>> On Fri, Feb 15, 2019 at 4:34 PM Kenneth Knowles  wrote:
>>>
>>>
>>>
>>> On Wed, Feb 13, 2019 at 3:11 PM Robert Bradshaw  wrote:

 On Wed, Feb 13, 2019 at 11:39 PM Steve Niemitz  
 wrote:
>
>
> On Wed, Feb 13, 2019 at 5:01 PM Robert Bradshaw  
> wrote:
>>
>> On Wed, Feb 13, 2019 at 5:07 PM Steve Niemitz  
>> wrote:
>>>
>>> Thanks again for the answers so far!  I really appreciate it.  As for 
>>> my specific use-case, we're using Bigtable as the final sink, and I'd 
>>> prefer to keep our writes fully idempotent for other reasons (ie no 
>>> read-modify-write).  We actually do track tentative vs final values 
>>> already, but checking that at write-time would impose a pretty big 
>>> overhead in the write path.
>>>
>>> After this I actually instrumented one of my running pipelines to 
>>> detect these "time traveling" panes, and did see it occurring pretty 
>>> frequently, particularly when dataflow decides to scale up/down the 
>>> job, so that was interesting.
>>>
>>> From all this, it seems like using a stateful DoFn to prevent time 
>>> traveling panes from overwriting newer ones is the best solution for 
>>> now.
>>
>>
>> Note that you can't "filter out" these time traveling panes, because at 
>> the next fusion break they might get re-ordered again.
>
>
> Ack, in a general sense.  To solve my specific problem my plan was to 
> ensure the final writer sink would be fused to this filter step (or even 
> build it directly into the DoFn itself that does the write), which would 
> work in my specific case (it seems like at least).
>
>>
>>
>>>
>>> My last question / statement is just around general education and 
>>> documentation about this.  I think the fact that PCollection are 
>>> unordered makes sense and is pretty intuitive, but fired panes being 
>>> delivered out-of-order seems very surprising.  I'm curious how many 
>>> other pipelines exist that run into this (and produce incorrect 
>>> results!) but people are unaware of.  Is there a way we can call this 
>>> behavior out?  For example, many of the sample beam projects use early 
>>> firings, but there's never any mention that the output may be 
>>> out-of-order.
>>
>>
>> +1 to improving the documentation here. Basically multiple firings 
>> become independent elements of the resulting PCollection, they don't 
>> retain any association/ordering.
>>
>> Multiply-triggered window are difficult to reason about (and not just in 
>> this case), https://s.apache.org/beam-sink-triggers is IMHO the