Hi Robert,

On Mon, May 16, 2022 at 12:33 PM Robert Bradshaw <rober...@google.com> wrote:
>
> On Mon, May 16, 2022 at 12:01 PM Deepak Nagaraj
> <deepak.naga...@primer.ai> wrote:
> >
>
> I can imagine contention for an I/O lock, but I'm not sure how that
> would lead to a deadlock. But hopefully knowing that print() is
> involved should allow a more minimal reproduction of the issue.
>

Yes, I've enclosed [1] a minimal pipeline that reproduces the problem
as well as the last set of logs. Per the logs, the problem occurs even
with a single worker thread.

Thanks,
Deepak

[1] Minimal Beam pipeline that stalls due to deadlock:

def _run_pipeline(pipeline):
    def process_data(unused):
        print('a'*1000)

    _ = (
            pipeline
            | "Create" >> beam.Create(['a']*1000)
            | "Process" >> beam.Map(process_data)
    )
    pipeline.run().wait_until_finish()

[2] Last set of logs from the Python worker pool:

DEBUG:apache_beam.runners.worker.sdk_worker:Got work 90
DEBUG:apache_beam.runners.worker.sdk_worker:Currently using 1 threads.
DEBUG:apache_beam.runners.worker.bundle_processor:start <DoOperation
Process output_tags=['None'], receivers=[ConsumerSet[Process.out0,
coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=0]]>
DEBUG:apache_beam.runners.worker.bundle_processor:start <DoOperation
Create/Map(decode) output_tags=['None'],
receivers=[SingletonConsumerSet[Create/Map(decode).out0,
coder=WindowedValueCoder[StrUtf8Coder], len(consumers)=1]]>
DEBUG:apache_beam.runners.worker.bundle_processor:start
<DataInputOperation fn/read/ref_PCollection_PCollection_7:0
receivers=[SingletonConsumerSet[fn/read/ref_PCollection_PCollection_7:0.out0,
coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>

Reply via email to