Hi,

Thank you so much for your help, the collect() function works. I tried
below and it prints correctly.

is_empty_check = (dupe_records | "CountGloballyDupes" >> Count.Globally()
#| "IsEmptyCheck" >> beam.Map(lambda x: x == 0)
)

is_empty_df = ib.collect(is_empty_check)
if(is_empty_df.iloc[0,0] == 0):
print("Empty")
else:
ib.show(is_empty_check)

I tried using Beam Dataframes as below but it says iloc() is not supported
for Beam Dataframes.

is_empty_beam_df = beam.dataframe.convert.to_dataframe(is_empty_check)

if (is_empty_beam_df.iloc[0, 0] == 0):
print("Empty")
else:
print(is_empty_beam_df)

Any other way to implement similar style checks using Beam Dataframes?

Thanks & Regards
Rajnil Guha

On Tue, Jul 20, 2021 at 3:52 AM Robert Bradshaw <[email protected]> wrote:

> On Mon, Jul 19, 2021 at 11:12 AM Reuven Lax <[email protected]> wrote:
> >
> > I know the Java API can handle default values for a combiner when the
> PCollection is empty (though this only works in the global window). I'm not
> sure offhand about Python.
> >
> > On Mon, Jul 19, 2021 at 10:32 AM Rajnil Guha <[email protected]>
> wrote:
> >>
> >> Hi,
> >>
> >> I tried running some experiments on Interactive Runner before actually
> running the pipeline on dataflow. Below is what I did though not sure if
> this is the correct way to do it:-
> >>
> >> Let's say I want to check whether the Pcoll named is_empty_check is
> empty or not and based on which I take a decision.
> >>
> >> is_empty_check = (dupe_records | "CountGlobally" >> Count.Globally()
> >> )
>
> is_empty_check now contains a PCollection with a single integer value
> in it (possibly 0).
>
> >> if is_empty_check != 0:
>
> is_empty_check is a PCollection, never equal to the int 0.
>
> >> ib.show(is_empty_check)
> >> else:
> >> print("Empty")
> >>
> >> In the above code it works fine when the Pcoll is not empty in which
> case it executes the statement under if part.
> >> But when the Pcoll is empty it does not execute the else part and
> instead executes the if part i.e. prints 0.
>
> It sounds like what you want to do is ib.collect(is_empty_check) and
> then see if its only element is 0.
>
> >> On Mon, Jul 19, 2021 at 12:32 AM Reuven Lax <[email protected]> wrote:
> >>>
> >>> You could count the collection (with default value of zero).
> >>>
> >>> On Sun, Jul 18, 2021, 11:42 AM Rajnil Guha <[email protected]>
> wrote:
> >>>>
> >>>> Hi Reuven,
> >>>>
> >>>> Yes, for now this is a bounded PCollection.
> >>>>
> >>>> Thanks & Regards
> >>>> Rajnil Guha
> >>>>
> >>>> On Mon, Jul 19, 2021 at 12:02 AM Reuven Lax <[email protected]> wrote:
> >>>>>
> >>>>> Is this a bounded collection?
> >>>>>
> >>>>> On Sun, Jul 18, 2021, 11:17 AM Rajnil Guha <[email protected]>
> wrote:
> >>>>>>
> >>>>>> Hi Beam Users,
> >>>>>>
> >>>>>> I have a use-case where I need to check whether a Pcollection is
> empty or not. If it's not empty I need to write a message to a Pub/Sub
> topic. I am using the Python SDK and Dataflow to write and run my pipelines
> respectively. I searched but could not come across any concrete way on how
> to check whether a Pcollection is empty or not using Python and how to take
> action based on the check. Is there any way to implement this using Beam.
> >>>>>>
> >>>>>> Thanks & Regards
> >>>>>> Rajnil Guha
>

Reply via email to