Re: Indexing a PCollection
Sure, I'm considering implementing distributed matrix factorization for collaborative filtering in beam as a project to teach myself the framework. One useful preprocessing step is to map the users and items to rows and columns of the ratings matrix. To make the matrix as dense as possible it is useful (though not strictly required) for those row and column numbers to be sequential integers. All that said, I'm very new to the framework and I'm not 100% sure that you can even naively express this algorithm in beam. Still the primitive I described seems generically useful when you want to build dense matrices/vectors. Daniel On Wed, Feb 20, 2019 at 9:49 PM Kenneth Knowles wrote: > Can you share more about the end-to-end use case? > > Kenn > > On Wed, Feb 20, 2019 at 7:38 PM Daniel Erenrich > wrote: > >> My workflow requires me to assign arbitrary unique sequential integers to >> every distinct element in a PCollection. In spark I would do this with >> .zipWithIndex but it seems that isn't possible in beam (there was a >> previous conversation about this on this mailing list). I was able to get >> something to work in beam/scio by doing the following >> >> val wordsSide = distinctWords.asListSideInput >> val wordsWithWords = distinctWords.withSideInputs(wordsSide) >> val indexedWords = wordsWithWords.keyBy((x, s) => >> s(wordsSide).indexOf(x)) >> >> It works for my tests but my inexperience with beam makes me worried this >> will not work in practice. Things I'm worried about: what if >> distinctWords is an unbounded stream? will that invalidate all of my >> previous indices? how will this interact with windowing? Is there a more >> efficient way to do this? >> >> (And yes I know indexOf will be very slow and I should be using a better >> data-structure) >> >> Also, if I do get this to work would this be something a PR would be >> accepted for to add this functionality directly into beam/scio? I often >> need something like this. >> >> Thanks for any help, >> Daniel >> >>
Re: Indexing a PCollection
Can you share more about the end-to-end use case? Kenn On Wed, Feb 20, 2019 at 7:38 PM Daniel Erenrich wrote: > My workflow requires me to assign arbitrary unique sequential integers to > every distinct element in a PCollection. In spark I would do this with > .zipWithIndex but it seems that isn't possible in beam (there was a > previous conversation about this on this mailing list). I was able to get > something to work in beam/scio by doing the following > > val wordsSide = distinctWords.asListSideInput > val wordsWithWords = distinctWords.withSideInputs(wordsSide) > val indexedWords = wordsWithWords.keyBy((x, s) => > s(wordsSide).indexOf(x)) > > It works for my tests but my inexperience with beam makes me worried this > will not work in practice. Things I'm worried about: what if distinctWords is > an unbounded stream? will that invalidate all of my previous indices? how > will this interact with windowing? Is there a more efficient way to do this? > > (And yes I know indexOf will be very slow and I should be using a better > data-structure) > > Also, if I do get this to work would this be something a PR would be > accepted for to add this functionality directly into beam/scio? I often > need something like this. > > Thanks for any help, > Daniel > >
Indexing a PCollection
My workflow requires me to assign arbitrary unique sequential integers to every distinct element in a PCollection. In spark I would do this with .zipWithIndex but it seems that isn't possible in beam (there was a previous conversation about this on this mailing list). I was able to get something to work in beam/scio by doing the following val wordsSide = distinctWords.asListSideInput val wordsWithWords = distinctWords.withSideInputs(wordsSide) val indexedWords = wordsWithWords.keyBy((x, s) => s(wordsSide).indexOf(x)) It works for my tests but my inexperience with beam makes me worried this will not work in practice. Things I'm worried about: what if distinctWords is an unbounded stream? will that invalidate all of my previous indices? how will this interact with windowing? Is there a more efficient way to do this? (And yes I know indexOf will be very slow and I should be using a better data-structure) Also, if I do get this to work would this be something a PR would be accepted for to add this functionality directly into beam/scio? I often need something like this. Thanks for any help, Daniel
Re: FileIO with GCP - KMS support
Hi Udi, Thanks for that, looking forward for the 2.11 release. JC Udi Meiri schrieb am Mi., 20. Feb. 2019, 22:26: > Hi Juan! > > I've recently replaced GCS copy with rewrite: > https://github.com/apache/beam/pull/7682 > It should be available in the next Beam release (2.11). > > On Wed, Feb 20, 2019 at 7:43 AM Juan Carlos Garcia > wrote: > >> Sorry, i hit send before verifying the right name of the method: >> >> The correct method name is: *enqueueCopy* >> >> On Wed, Feb 20, 2019 at 4:39 PM Juan Carlos Garcia >> wrote: >> >>> For anyone interested on the same while waiting for KMS support, just >>> place the class on your own project >>> *org.apache.beam.sdk.util.GcsUtil * >>> >>> Look / modify the *enqueCopy *method and replace the >>> *storageClient.objects().copy() >>> *with a *storageClient.objects().rewrite() , add the corresponding >>> callback *and it should works as expected. >>> >>> Cheers! >>> >>> >>> On Wed, Feb 20, 2019 at 11:11 AM Juan Carlos Garcia >>> wrote: >>> Hi Folks, Is there any discussion going on regarding the support to writes to GCP bucket protected with KMS ? Thanks and regards, -- JC >>> >>> -- >>> >>> JC >>> >>> >> >> -- >> >> JC >> >>
Re: FileIO with GCP - KMS support
Hi Juan! I've recently replaced GCS copy with rewrite: https://github.com/apache/beam/pull/7682 It should be available in the next Beam release (2.11). On Wed, Feb 20, 2019 at 7:43 AM Juan Carlos Garcia wrote: > Sorry, i hit send before verifying the right name of the method: > > The correct method name is: *enqueueCopy* > > On Wed, Feb 20, 2019 at 4:39 PM Juan Carlos Garcia > wrote: > >> For anyone interested on the same while waiting for KMS support, just >> place the class on your own project >> *org.apache.beam.sdk.util.GcsUtil * >> >> Look / modify the *enqueCopy *method and replace the >> *storageClient.objects().copy() >> *with a *storageClient.objects().rewrite() , add the corresponding >> callback *and it should works as expected. >> >> Cheers! >> >> >> On Wed, Feb 20, 2019 at 11:11 AM Juan Carlos Garcia >> wrote: >> >>> Hi Folks, >>> >>> Is there any discussion going on regarding the support to writes to GCP >>> bucket protected with KMS ? >>> >>> Thanks and regards, >>> -- >>> >>> JC >>> >>> >> >> -- >> >> JC >> >> > > -- > > JC > > smime.p7s Description: S/MIME Cryptographic Signature
Re: Some questions about ensuring correctness with windowing and triggering
On Tue, Feb 12, 2019 at 10:28 AM Robert Bradshaw wrote: > Correct, even within the same key there's no promise of event time > ordering mapping of panes to real time ordering because the downstream > operations *may* happen on a different machine. Multiply triggered > windows add an element of non-determinism to the process. > For clarification, the stage immediately after GBK itself processes fired panes in order, correct? Of course, any more stages downstream of that may see them out of order. Raghu. > > You're also correct that triggering with multiple panes requires lots of > care, especially when it comes to operations with side effects (like > sinks). Most safe is to only write the final pane to the sink, and handle > early triggering in a different way. > https://s.apache.org/beam-sink-triggers is a proposal to make this easier > to reason about. > > > On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz wrote: > >> Also to clarify here (I re-read this and realized it could be slightly >> unclear). My question is only about in-order delivery of panes. ie: will >> pane P always be delivered before P+1. >> >> I realize the use of "in-order" before could be confusing, I don't care >> about the ordering of the elements per-se, just the ordering of the pane >> delivery. >> >> I want to make sure that given a GBK that produces 3 panes (P0, P1, P2) >> for a key, a downstream PCollection could never see P0, P2, P1. OR at >> least, the final firing is always guaranteed to be delivered after all >> early-firings (eg we could have P0, P2, P1, but then always PLast). >> >> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz >> wrote: >> >>> Are you also saying also that even in the first example (Source -> >>> CombineByKey (Sum) -> Sink) there's no guarantee that events would be >>> delivered in-order from the Combine -> Sink transforms? This seems like a >>> pretty big "got-cha" for correctness if you ever use accumulating >>> triggering. >>> >>> I'd also like to point out I'm not talking about a global ordering >>> across the entire PCollection, I'm talking about within the same key after >>> a GBK transform. >>> >>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw >>> wrote: >>> Due to the nature of distributed processing, order is not preserved. You can, however, inspect the PaneInfo to determine if an element was early, on-time, or late and act accordingly. On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia wrote: > In my experience ordering is not guaranteed, you may need apply a > transformation that sort the elements and then dispatch them sorted out. > > Or uses the Sorter extension for this: > > https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter > > Steve Niemitz schrieb am Di., 12. Feb. 2019, > 16:31: > >> Hi everyone, I have some questions I want to ask about how windowing, >> triggering, and panes work together, and how to ensure correctness >> throughout a pipeline. >> >> Lets assume I have a very simple streaming pipeline that looks like: >> Source -> CombineByKey (Sum) -> Sink >> >> Given fixed windows of 1 hour, early firings every minute, and >> accumulating panes, this is pretty straight forward. However, this can >> get >> more complicated if we add steps after the CombineByKey, for instance >> (using the same windowing strategy): >> >> Say I want to buffer the results of the CombineByKey into batches of >> N elements. I can do this with the built-in GroupIntoBatches [1] >> transform, now my pipeline looks like: >> >> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink >> >> *This leads to my main question:* >> Is ordering preserved somehow here? ie: is it possible that the >> result from early firing F+1 now comes BEFORE the firing F (because it >> was >> re-ordered in the GroupIntoBatches). This would mean that the sink then >> gets F+1 before F, which means my resulting store has incorrect data >> (possibly forever if F+1 was the final firing). >> >> If ordering is not preserved, it seems as if I'd need to introduce my >> own ordering back in after GroupIntoBatches. GIB is an example here, >> but I >> imagine this could happen with any GBK type operation. >> >> Am I thinking about this the correct way? Thanks! >> >> [1] >> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html >> >
Re: FileIO with GCP - KMS support
Sorry, i hit send before verifying the right name of the method: The correct method name is: *enqueueCopy* On Wed, Feb 20, 2019 at 4:39 PM Juan Carlos Garcia wrote: > For anyone interested on the same while waiting for KMS support, just > place the class on your own project > *org.apache.beam.sdk.util.GcsUtil * > > Look / modify the *enqueCopy *method and replace the > *storageClient.objects().copy() > *with a *storageClient.objects().rewrite() , add the corresponding > callback *and it should works as expected. > > Cheers! > > > On Wed, Feb 20, 2019 at 11:11 AM Juan Carlos Garcia > wrote: > >> Hi Folks, >> >> Is there any discussion going on regarding the support to writes to GCP >> bucket protected with KMS ? >> >> Thanks and regards, >> -- >> >> JC >> >> > > -- > > JC > > -- JC
Re: FileIO with GCP - KMS support
For anyone interested on the same while waiting for KMS support, just place the class on your own project *org.apache.beam.sdk.util.GcsUtil * Look / modify the *enqueCopy *method and replace the *storageClient.objects().copy() *with a *storageClient.objects().rewrite() , add the corresponding callback *and it should works as expected. Cheers! On Wed, Feb 20, 2019 at 11:11 AM Juan Carlos Garcia wrote: > Hi Folks, > > Is there any discussion going on regarding the support to writes to GCP > bucket protected with KMS ? > > Thanks and regards, > -- > > JC > > -- JC
FileIO with GCP - KMS support
Hi Folks, Is there any discussion going on regarding the support to writes to GCP bucket protected with KMS ? Thanks and regards, -- JC
Re: Some questions about ensuring correctness with windowing and triggering
I also think that we should improve our documentation. Multiply-fired triggers is an advanced and dangerous feature, and care needs to be taken that the downstream portions of the pipeline handle them correctly. I filed https://issues.apache.org/jira/browse/BEAM-6716 On Wed, Feb 20, 2019 at 5:40 AM Kenneth Knowles wrote: > > This is a very valid concern so I want to offer advice you can apply today. > > "We actually do track tentative vs final values already, but checking that at > write-time would impose a pretty big overhead in the write path." > > You mention you are writing to BigTable. If your key is conceptually the > tuple of (isFinal, actual key) then you can avoid read-modify-write, and at > query time (of your BigTable) you can choose to only look at final values. > There are probably variations of this so you can get "on-time" as another > category to fetch. > > One reason we have not taken on the large-scale re-architecting is that if > you control the whole pipeline it is usually possible to meet your business > need with some combination of accumulation mode, existing sinks, and behavior > of your downstream client. > > I also wanted to respond to this bit: > > >> I was reading this yesterday, but couldn't see how it solved the > >> out-of-order delivery problem here... > > > It moves the responsibility of doing things in the right order (and even > > defining what order is "correct enough") to the runner (and sinks) such > > that the side effects happen in order... > > Again, emphasis on sinks. Many of the ideas there just apply to sinks in > general. For example, the sink that writes deltas and the sink that writes > whole elements are _different sinks_. The sink that treats retractions as > "delete" side effects and the sink that write a row representing the > retraction are _different sinks_. These are steps we could take in Beam today. > > Kenn > > > On Tue, Feb 19, 2019 at 6:55 AM Steve Niemitz wrote: >> >> Thanks again for all the replies everyone. Just as a final follow up here, >> are there any concrete plans on addressing these issues I could start >> following? The sink trigger doc seems like a start, but also seems like >> just a starting point in a larger re-architecture of sinks. >> >> On Fri, Feb 15, 2019 at 4:34 PM Kenneth Knowles wrote: >>> >>> >>> >>> On Wed, Feb 13, 2019 at 3:11 PM Robert Bradshaw wrote: On Wed, Feb 13, 2019 at 11:39 PM Steve Niemitz wrote: > > > On Wed, Feb 13, 2019 at 5:01 PM Robert Bradshaw > wrote: >> >> On Wed, Feb 13, 2019 at 5:07 PM Steve Niemitz >> wrote: >>> >>> Thanks again for the answers so far! I really appreciate it. As for >>> my specific use-case, we're using Bigtable as the final sink, and I'd >>> prefer to keep our writes fully idempotent for other reasons (ie no >>> read-modify-write). We actually do track tentative vs final values >>> already, but checking that at write-time would impose a pretty big >>> overhead in the write path. >>> >>> After this I actually instrumented one of my running pipelines to >>> detect these "time traveling" panes, and did see it occurring pretty >>> frequently, particularly when dataflow decides to scale up/down the >>> job, so that was interesting. >>> >>> From all this, it seems like using a stateful DoFn to prevent time >>> traveling panes from overwriting newer ones is the best solution for >>> now. >> >> >> Note that you can't "filter out" these time traveling panes, because at >> the next fusion break they might get re-ordered again. > > > Ack, in a general sense. To solve my specific problem my plan was to > ensure the final writer sink would be fused to this filter step (or even > build it directly into the DoFn itself that does the write), which would > work in my specific case (it seems like at least). > >> >> >>> >>> My last question / statement is just around general education and >>> documentation about this. I think the fact that PCollection are >>> unordered makes sense and is pretty intuitive, but fired panes being >>> delivered out-of-order seems very surprising. I'm curious how many >>> other pipelines exist that run into this (and produce incorrect >>> results!) but people are unaware of. Is there a way we can call this >>> behavior out? For example, many of the sample beam projects use early >>> firings, but there's never any mention that the output may be >>> out-of-order. >> >> >> +1 to improving the documentation here. Basically multiple firings >> become independent elements of the resulting PCollection, they don't >> retain any association/ordering. >> >> Multiply-triggered window are difficult to reason about (and not just in >> this case), https://s.apache.org/beam-sink-triggers is IMHO the