Hi, Thanks for your suggestions, I will surely check them out. My exact use-case is to check if the Pcoll is empty, and if it is, publish a message into a Pub/Sub topic. This message will then be further used downstream by some other processes.
Thanks & Regards Rajnil Guha On Wed, Jul 21, 2021 at 10:09 PM Robert Bradshaw <rober...@google.com> wrote: > Beam in general doesn't support conditional pipeline construction > based on earlier results in the pipeline. > > You can do things like > > pcoll = ... > other_pcoll = ... > other_pcoll_size = other_pcoll | beam.Count.Globally() > > pcoll_to_process_only_if_other_pcoll_is_empty = ( > pcoll | beam.Filter(lambda x, size: size == 0, > beam.pvalue.AsSingleton(other_pcoll_size)) > > pcoll_to_process_only_if_other_pcoll_is_empty | SomeTransform() > > Alternatively, you can run two pipelines sequentially, e.g. > > with beam.Pipeline() as p: > ... > result | beam.WriteToText(...) > > [manually inspect the written results] > > with beam.Pipeline() as p: > [construction here can depend on result] > > > I would be curious as to what your exact usecase is, as supporting > something like this is something we've talked about in the past. > > > > > On Wed, Jul 21, 2021 at 8:40 AM Rajnil Guha <rajnil94.g...@gmail.com> > wrote: > > > > Yes I am just thinking how to modify/rewrite this piece of code if I > want to run my pipeline on Dataflow runner. > > > > Thanks & Regards > > Rajnil Guha > > > > On Wed, Jul 21, 2021 at 1:12 AM Robert Bradshaw <rober...@google.com> > wrote: > >> > >> On Tue, Jul 20, 2021 at 12:33 PM Rajnil Guha <rajnil94.g...@gmail.com> > wrote: > >> > > >> > Hi, > >> > > >> > Thank you so much for your help, the collect() function works. I > tried below and it prints correctly. > >> > > >> > is_empty_check = (dupe_records | "CountGloballyDupes" >> > Count.Globally() > >> > #| "IsEmptyCheck" >> beam.Map(lambda x: x == 0) > >> > ) > >> > > >> > is_empty_df = ib.collect(is_empty_check) > >> > if(is_empty_df.iloc[0,0] == 0): > >> > print("Empty") > >> > else: > >> > ib.show(is_empty_check) > >> > >> Yes, this should work fine. > >> > >> > I tried using Beam Dataframes as below but it says iloc() is not > supported for Beam Dataframes. > >> > >> Correct. iloc depends on ordering, and PCollections aren't in general > >> ordered. However, see below. > >> > >> > is_empty_beam_df = beam.dataframe.convert.to_dataframe(is_empty_check) > >> > > >> > if (is_empty_beam_df.iloc[0, 0] == 0): > >> > print("Empty") > >> > else: > >> > print(is_empty_beam_df) > >> > > >> > Any other way to implement similar style checks using Beam Dataframes? > >> > >> It sounds like you're trying to get around using collect. The issue > >> is, until "collect" (or similar) is called, dupe_records doesn't > >> really have any contents to look into or compare against. All it is is > >> a pointer to an expression to how to compute it. When you call collect > >> (or pipeline.run) the pipeline is actually executed and the result (0 > >> or whatever) is computed. > >> > >> > Thanks & Regards > >> > Rajnil Guha > >> > > >> > On Tue, Jul 20, 2021 at 3:52 AM Robert Bradshaw <rober...@google.com> > wrote: > >> >> > >> >> On Mon, Jul 19, 2021 at 11:12 AM Reuven Lax <re...@google.com> > wrote: > >> >> > > >> >> > I know the Java API can handle default values for a combiner when > the PCollection is empty (though this only works in the global window). I'm > not sure offhand about Python. > >> >> > > >> >> > On Mon, Jul 19, 2021 at 10:32 AM Rajnil Guha < > rajnil94.g...@gmail.com> wrote: > >> >> >> > >> >> >> Hi, > >> >> >> > >> >> >> I tried running some experiments on Interactive Runner before > actually running the pipeline on dataflow. Below is what I did though not > sure if this is the correct way to do it:- > >> >> >> > >> >> >> Let's say I want to check whether the Pcoll named is_empty_check > is empty or not and based on which I take a decision. > >> >> >> > >> >> >> is_empty_check = (dupe_records | "CountGlobally" >> > Count.Globally() > >> >> >> ) > >> >> > >> >> is_empty_check now contains a PCollection with a single integer value > >> >> in it (possibly 0). > >> >> > >> >> >> if is_empty_check != 0: > >> >> > >> >> is_empty_check is a PCollection, never equal to the int 0. > >> >> > >> >> >> ib.show(is_empty_check) > >> >> >> else: > >> >> >> print("Empty") > >> >> >> > >> >> >> In the above code it works fine when the Pcoll is not empty in > which case it executes the statement under if part. > >> >> >> But when the Pcoll is empty it does not execute the else part and > instead executes the if part i.e. prints 0. > >> >> > >> >> It sounds like what you want to do is ib.collect(is_empty_check) and > >> >> then see if its only element is 0. > >> >> > >> >> >> On Mon, Jul 19, 2021 at 12:32 AM Reuven Lax <re...@google.com> > wrote: > >> >> >>> > >> >> >>> You could count the collection (with default value of zero). > >> >> >>> > >> >> >>> On Sun, Jul 18, 2021, 11:42 AM Rajnil Guha < > rajnil94.g...@gmail.com> wrote: > >> >> >>>> > >> >> >>>> Hi Reuven, > >> >> >>>> > >> >> >>>> Yes, for now this is a bounded PCollection. > >> >> >>>> > >> >> >>>> Thanks & Regards > >> >> >>>> Rajnil Guha > >> >> >>>> > >> >> >>>> On Mon, Jul 19, 2021 at 12:02 AM Reuven Lax <re...@google.com> > wrote: > >> >> >>>>> > >> >> >>>>> Is this a bounded collection? > >> >> >>>>> > >> >> >>>>> On Sun, Jul 18, 2021, 11:17 AM Rajnil Guha < > rajnil94.g...@gmail.com> wrote: > >> >> >>>>>> > >> >> >>>>>> Hi Beam Users, > >> >> >>>>>> > >> >> >>>>>> I have a use-case where I need to check whether a Pcollection > is empty or not. If it's not empty I need to write a message to a Pub/Sub > topic. I am using the Python SDK and Dataflow to write and run my pipelines > respectively. I searched but could not come across any concrete way on how > to check whether a Pcollection is empty or not using Python and how to take > action based on the check. Is there any way to implement this using Beam. > >> >> >>>>>> > >> >> >>>>>> Thanks & Regards > >> >> >>>>>> Rajnil Guha >