Re: Needed help identifying a error in running a SDF

2020-08-04 Thread Boyuan Zhang
Hi Mayank,

Which runner do you want to run your pipeline? You should add 'beam_fn_api'
when you launch the pipeline --experiments=beam_fn_api.
In your code:

class TestDoFn(beam.DoFn):
def process(
self,
element,
restriction_tracker=beam.DoFn.RestrictionParam(
TestProvider())):
import pdb; pdb.set_trace()
cur = restriction_tracker.current_restriction().start
while restriction_tracker.try_claim(cur):
  return element -> yield element; cur += 1



On Tue, Aug 4, 2020 at 11:07 AM Mayank Ketkar  wrote:

> Hello Team,
>
> I was hoping to get anyones help with an error I'm encountering in
> running SDF.
>
> Posted the question imn stack overflow (includes code)
>
> https://stackoverflow.com/questions/63252327/error-in-running-apache-beam-python-splittabledofn
>
> However I am receiving a error
> RuntimeError: Transform node
> AppliedPTransform(ParDo(TestDoFn)/ProcessKeyedElements/GroupByKey/GroupByKey,
> _GroupByKeyOnly) was not replaced as expected.
>
> when trying to apply a SDF to a pubsubIO source
>
> Thanks in advance!! Really!!
>
> Mayank
>


Needed help identifying a error in running a SDF

2020-08-04 Thread Mayank Ketkar
Hello Team,

I was hoping to get anyones help with an error I'm encountering in
running SDF.

Posted the question imn stack overflow (includes code)
https://stackoverflow.com/questions/63252327/error-in-running-apache-beam-python-splittabledofn

However I am receiving a error
RuntimeError: Transform node
AppliedPTransform(ParDo(TestDoFn)/ProcessKeyedElements/GroupByKey/GroupByKey,
_GroupByKeyOnly) was not replaced as expected.

when trying to apply a SDF to a pubsubIO source

Thanks in advance!! Really!!

Mayank