Good afternoon,

I am using Python 3.7.9 and the Python Beam SDK 2.29.0, under Darwin.

I attempted running, in a stand-alone way, the coders.py cookbook example
[1]. However, I failed to use it. When I run it using the coders_test.py
file [2], it works. However, if I try to use it to read from a new-line
delimited JSON file, it fails.

This is the sample file (input.ndjson) I created, using the same data as
coders_test.py
```
{"host": ["Germany", 1], "guest": ["Italy", 0]}
{"host": ["Germany", 1], "guest": ["Brasil", 3]}
{"host": ["Brasil", 1], "guest": ["Italy", 0]}
```

This is how I am trying to process it:
```
    python coders.py  --input input.ndjson --output output.txt
```

And this is part of the traceback:
```
INFO:apache_beam.runners.worker.statecache:Creating state cache with size
100

INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created
Worker handler
<apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler
object at 0x7fadc69f6c10> for environment
ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')

INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running
((((ref_AppliedPTransform_read/Read/Impulse_4)+(ref_AppliedPTransform_read/Read/Map(<lambda
at
iobase.py:899>)_5))+(read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction))+(read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction))+(ref_PCollection_PCollection_2_split/Write)

Traceback (most recent call last):

  File "apache_beam/runners/common.py", line 1233, in
apache_beam.runners.common.DoFnRunner.process

  File "apache_beam/runners/common.py", line 581, in
apache_beam.runners.common.SimpleInvoker.invoke_process

  File "apache_beam/runners/common.py", line 1395, in
apache_beam.runners.common._OutputProcessor.process_outputs

  File "apache_beam/runners/worker/operations.py", line 219, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive

  File "apache_beam/runners/worker/operations.py", line 183, in
apache_beam.runners.worker.operations.ConsumerSet.update_counters_start

  File "apache_beam/runners/worker/opcounters.py", line 217, in
apache_beam.runners.worker.opcounters.OperationCounters.update_from

  File "apache_beam/runners/worker/opcounters.py", line 255, in
apache_beam.runners.worker.opcounters.OperationCounters.do_sample

  File "apache_beam/coders/coder_impl.py", line 1311, in
apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables

  File "apache_beam/coders/coder_impl.py", line 1322, in
apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables

  File "apache_beam/coders/coder_impl.py", line 354, in
apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.get_estimated_size_and_observables

  File "apache_beam/coders/coder_impl.py", line 418, in
apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream

  File "apache_beam/coders/coder_impl.py", line 261, in
apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream

  File
"/private/tmp/dataflow_sandbox/lib/python3.7/site-packages/apache_beam/coders/coders.py",
line 749, in <lambda>

    lambda x: dumps(x, protocol), pickle.loads)

_pickle.PicklingError: Can't pickle <class '__main__.JsonCoder'>: it's not
the same object as __main__.JsonCoder
```

Questions:
1. What am I doing wrong?
2. Is a coder the recommended approach to decode NDJSON files, or is it
advised to just use ParDos / Maps to deserialize them?

Thank you in advance!

Kind regards,

Tatiana


[1]
https://github.com/apache/beam/blob/35bac6a62f1dc548ee908cfeff7f73ffcac38e6f/sdks/python/apache_beam/examples/cookbook/coders.py
[2]
https://github.com/apache/beam/blob/35bac6a62f1dc548ee908cfeff7f73ffcac38e6f/sdks/python/apache_beam/examples/cookbook/coders_test.py

Reply via email to