Hi,

Thanks for your suggestions, I will surely check them out. My exact
use-case is to check if the Pcoll is empty, and if it is, publish a message
into a Pub/Sub topic. This message will then be further used downstream by
some other processes.

Thanks & Regards
Rajnil Guha

On Wed, Jul 21, 2021 at 10:09 PM Robert Bradshaw <rober...@google.com>
wrote:

> Beam in general doesn't support conditional pipeline construction
> based on earlier results in the pipeline.
>
> You can do things like
>
> pcoll = ...
> other_pcoll = ...
> other_pcoll_size = other_pcoll | beam.Count.Globally()
>
> pcoll_to_process_only_if_other_pcoll_is_empty = (
>     pcoll | beam.Filter(lambda x, size: size == 0,
> beam.pvalue.AsSingleton(other_pcoll_size))
>
> pcoll_to_process_only_if_other_pcoll_is_empty | SomeTransform()
>
> Alternatively, you can run two pipelines sequentially, e.g.
>
> with beam.Pipeline() as p:
>    ...
>    result | beam.WriteToText(...)
>
> [manually inspect the written results]
>
> with beam.Pipeline() as p:
>     [construction here can depend on result]
>
>
> I would be curious as to what your exact usecase is, as supporting
> something like this is something we've talked about in the past.
>
>
>
>
> On Wed, Jul 21, 2021 at 8:40 AM Rajnil Guha <rajnil94.g...@gmail.com>
> wrote:
> >
> > Yes I am just thinking how to modify/rewrite this piece of code if I
> want to run my pipeline on Dataflow runner.
> >
> > Thanks & Regards
> > Rajnil Guha
> >
> > On Wed, Jul 21, 2021 at 1:12 AM Robert Bradshaw <rober...@google.com>
> wrote:
> >>
> >>  On Tue, Jul 20, 2021 at 12:33 PM Rajnil Guha <rajnil94.g...@gmail.com>
> wrote:
> >> >
> >> > Hi,
> >> >
> >> > Thank you so much for your help, the collect() function works. I
> tried below and it prints correctly.
> >> >
> >> > is_empty_check = (dupe_records | "CountGloballyDupes" >>
> Count.Globally()
> >> > #| "IsEmptyCheck" >> beam.Map(lambda x: x == 0)
> >> > )
> >> >
> >> > is_empty_df = ib.collect(is_empty_check)
> >> > if(is_empty_df.iloc[0,0] == 0):
> >> > print("Empty")
> >> > else:
> >> > ib.show(is_empty_check)
> >>
> >> Yes, this should work fine.
> >>
> >> > I tried using Beam Dataframes as below but it says iloc() is not
> supported for Beam Dataframes.
> >>
> >> Correct. iloc depends on ordering, and PCollections aren't in general
> >> ordered. However, see below.
> >>
> >> > is_empty_beam_df = beam.dataframe.convert.to_dataframe(is_empty_check)
> >> >
> >> > if (is_empty_beam_df.iloc[0, 0] == 0):
> >> > print("Empty")
> >> > else:
> >> > print(is_empty_beam_df)
> >> >
> >> > Any other way to implement similar style checks using Beam Dataframes?
> >>
> >> It sounds like you're trying to get around using collect. The issue
> >> is, until "collect" (or similar) is called, dupe_records doesn't
> >> really have any contents to look into or compare against. All it is is
> >> a pointer to an expression to how to compute it. When you call collect
> >> (or pipeline.run) the pipeline is actually executed and the result (0
> >> or whatever) is computed.
> >>
> >> > Thanks & Regards
> >> > Rajnil Guha
> >> >
> >> > On Tue, Jul 20, 2021 at 3:52 AM Robert Bradshaw <rober...@google.com>
> wrote:
> >> >>
> >> >> On Mon, Jul 19, 2021 at 11:12 AM Reuven Lax <re...@google.com>
> wrote:
> >> >> >
> >> >> > I know the Java API can handle default values for a combiner when
> the PCollection is empty (though this only works in the global window). I'm
> not sure offhand about Python.
> >> >> >
> >> >> > On Mon, Jul 19, 2021 at 10:32 AM Rajnil Guha <
> rajnil94.g...@gmail.com> wrote:
> >> >> >>
> >> >> >> Hi,
> >> >> >>
> >> >> >> I tried running some experiments on Interactive Runner before
> actually running the pipeline on dataflow. Below is what I did though not
> sure if this is the correct way to do it:-
> >> >> >>
> >> >> >> Let's say I want to check whether the Pcoll named is_empty_check
> is empty or not and based on which I take a decision.
> >> >> >>
> >> >> >> is_empty_check = (dupe_records | "CountGlobally" >>
> Count.Globally()
> >> >> >> )
> >> >>
> >> >> is_empty_check now contains a PCollection with a single integer value
> >> >> in it (possibly 0).
> >> >>
> >> >> >> if is_empty_check != 0:
> >> >>
> >> >> is_empty_check is a PCollection, never equal to the int 0.
> >> >>
> >> >> >> ib.show(is_empty_check)
> >> >> >> else:
> >> >> >> print("Empty")
> >> >> >>
> >> >> >> In the above code it works fine when the Pcoll is not empty in
> which case it executes the statement under if part.
> >> >> >> But when the Pcoll is empty it does not execute the else part and
> instead executes the if part i.e. prints 0.
> >> >>
> >> >> It sounds like what you want to do is ib.collect(is_empty_check) and
> >> >> then see if its only element is 0.
> >> >>
> >> >> >> On Mon, Jul 19, 2021 at 12:32 AM Reuven Lax <re...@google.com>
> wrote:
> >> >> >>>
> >> >> >>> You could count the collection (with default value of zero).
> >> >> >>>
> >> >> >>> On Sun, Jul 18, 2021, 11:42 AM Rajnil Guha <
> rajnil94.g...@gmail.com> wrote:
> >> >> >>>>
> >> >> >>>> Hi Reuven,
> >> >> >>>>
> >> >> >>>> Yes, for now this is a bounded PCollection.
> >> >> >>>>
> >> >> >>>> Thanks & Regards
> >> >> >>>> Rajnil Guha
> >> >> >>>>
> >> >> >>>> On Mon, Jul 19, 2021 at 12:02 AM Reuven Lax <re...@google.com>
> wrote:
> >> >> >>>>>
> >> >> >>>>> Is this a bounded collection?
> >> >> >>>>>
> >> >> >>>>> On Sun, Jul 18, 2021, 11:17 AM Rajnil Guha <
> rajnil94.g...@gmail.com> wrote:
> >> >> >>>>>>
> >> >> >>>>>> Hi Beam Users,
> >> >> >>>>>>
> >> >> >>>>>> I have a use-case where I need to check whether a Pcollection
> is empty or not. If it's not empty I need to write a message to a Pub/Sub
> topic. I am using the Python SDK and Dataflow to write and run my pipelines
> respectively. I searched but could not come across any concrete way on how
> to check whether a Pcollection is empty or not using Python and how to take
> action based on the check. Is there any way to implement this using Beam.
> >> >> >>>>>>
> >> >> >>>>>> Thanks & Regards
> >> >> >>>>>> Rajnil Guha
>

Reply via email to