Re: Monitor number of dropped elements due to lateness in Samza/Flink runner
Disclaimer The information contained in this communication from the sender is confidential. It is intended solely for use by the recipient and others authorized to receive it. If you are not the recipient, you are hereby notified that any disclosure, copying, distribution or taking action in relation of the contents of this information is strictly prohibited and may be unlawful.
sudden problems in running a test pipieline
Hello all i have been writing pipelines with beam , and suddenly my unit tests with a weird exception This is a sample pipeline i have that basically does nothing key = os.environ['FMPREPKEY'] with TestPipeline() as p: (p | 'start run_mm' >> beam.Create(['20210101']) | 'prnt' >> beam.Map(print) ) and still i am getting the exception below have i somehow messed up my imports? Kind regards Marco elf = action = _StoreAction(option_strings=['--key'], dest='key', nargs=None, const=None, default=None, type=None, choices=None, help=None, metavar=None) conflicting_actions = [('--key', _StoreAction(option_strings=['--key'], dest='key', nargs=None, const=None, default=None, type=None, choices=None, help=None, metavar=None))] def _handle_conflict_error(self, action, conflicting_actions): message = ngettext('conflicting option string: %s', 'conflicting option strings: %s', len(conflicting_actions)) conflict_string = ', '.join([option_string for option_string, action in conflicting_actions]) > raise ArgumentError(action, message % conflict_string) E argparse.ArgumentError: argument --key: conflicting option string: --key
Exception handling in ReadFromTextWithFilename?
Hi friends, I encountered an issue with the beam python SDK (2.43.0) recently where I was using ReadFromTextWithFilename on a Google Cloud Storage (GCS) bucket that contains roughly 95k gzip compressed CSV files. One of the files was truncated in transit, so the job ran for a couple of hours before returning an exception like zlib.error: Error -3 while decompressing data: incorrect header check from within the apache_beam.io.Filesystem module. The exception didn't indicate the filename for the truncated file, and from looking through the standard library, I couldn't find any mechanism to handle the exception or to return additional context that would have allowed me to remediate the situation. Is there an example of how to handle this situation? Ideally, the library would return a PCollection of filenames that encountered errors while reading or something similar to that for further processing rather than causing a job to crash.