That must have been the reason. I removed the extra parameter and now it works. Thanks for the help.
On Wed, 24 Feb 2021 at 02:54, Yichi Zhang <[email protected]> wrote: > It seems that your DoFn is expecting a side input, could you verify that > you are actually feeding the side input to your DoFn like > `beam.ParDo(DebugWindowInformation(), 'extra_info') `, I suspect that > missing side input of your DoFn has messed up the argument translation. > > On Tue, Feb 23, 2021 at 5:29 PM Ahmet Altay <[email protected]> wrote: > >> /cc +Yichi Zhang <[email protected]> >> >> On Fri, Feb 19, 2021 at 2:24 AM Manninger, Matyas < >> [email protected]> wrote: >> >>> Dear Beam users, >>> >>> I am using the following code to log debug info about my streaming >>> pipeline: >>> >>> class DebugWindowInformation(beam.DoFn): >>> def to_runner_api_parameter(self, unused_context): >>> pass >>> >>> def process(self, data_item, extra='', >>> timest=beam.DoFn.TimestampParam, windowparam=beam.DoFn.WindowParam, *args): >>> import logging >>> # GCP does NOT log debug on ROOT level >>> print(type(windowparam)) >>> print(windowparam) >>> logging.info(f'[{datetime.datetime.now()}] [{timest}] window: >>> {windowparam.start}-{windowparam.end} message: {extra} {data_item}') >>> #logging.info(f'[{datetime.datetime.now()}] >>> [{timest.to_utc_datetime()}] window: >>> {windowparam.start.to_utc_datetime()}-{windowparam.end.to_utc_datetime()} >>> message: {extra} {data_item}') >>> yield data_item >>> >>> Unfortunately I get the following error: >>> <class 'apache_beam.transforms.core._DoFnParam'> >>> WindowParam >>> logging.info(f'[{datetime.datetime.now()}] [{timest}] window: >>> {windowparam.start}-{windowparam.end} message: {extra} {data_item}') >>> AttributeError: '_DoFnParam' object has no attribute 'start' >>> >>> The code is taken from examples. Anyone has and idea what might cause >>> the error? >>> >>> Any tip is appreciated, >>> Matyas >>> >>> >>>
