[ https://issues.apache.org/jira/browse/BEAM-10996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208270#comment-17208270 ]
Ning Kang commented on BEAM-10996: ---------------------------------- First, I have to clarify this is not related to the notebook, the pipeline fails no matter where it runs. Second, there are 2 problems with the pipeline: When using `beam.FlatMap`, the 3 serialized strings are flattened into dozens of integers. Thus you have the assertion error because the elements associated with the PCollection are now of type `int`. You should use `beam.Map` instead of `beam.FlatMap`. If this is intended, you have to convert the int elements into strings such as appending another transform `beam.Map(lambda x: str(x))`.The SerializeToString is not decodable by Beam. You can enclose it with `return base64.b64encode(tfexample.SerializeToString()).decode('utf-8')`. > AssertionError: (10, <class 'int'>) when writing TF Records > ----------------------------------------------------------- > > Key: BEAM-10996 > URL: https://issues.apache.org/jira/browse/BEAM-10996 > Project: Beam > Issue Type: Bug > Components: sdk-py-core > Affects Versions: 2.22.0 > Reporter: Valliappa Lakshmanan > Assignee: Ning Kang > Priority: P1 > Attachments: repro.py > > > This code snippet: > > def create_tfrecord(x): > size = np.array([2.0, 3.0]) > tfexample = tf.train.Example( > features=tf.train.Features( > feature={ > 'size': tf.train.Feature(float_list=tf.train.FloatList(value=size)) > })) > return tfexample.SerializeToString() > > ... > beam.FlatMap(lambda x: create_tfrecord(x)) > ... > > throws this error: > > Traceback (most recent call last): File "apache_beam/runners/common.py", line > 961, in apache_beam.runners.common.DoFnRunner.process File > "apache_beam/runners/common.py", line 726, in > apache_beam.runners.common.PerWindowInvoker.invoke_process File > "apache_beam/runners/common.py", line 814, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File > "/opt/conda/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1061, > in process self.writer.write(element) File > "/opt/conda/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", > line 420, in write self.sink.write_record(self.temp_handle, value) File > "/opt/conda/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", > line 146, in write_record self.write_encoded_record(file_handle, > self.coder.encode(value)) File > "/opt/conda/lib/python3.7/site-packages/apache_beam/coders/coders.py", line > 463, in encode return self.get_impl().encode(value) File > "apache_beam/coders/coder_impl.py", line 494, in > apache_beam.coders.coder_impl.BytesCoderImpl.encode File > "apache_beam/coders/coder_impl.py", line 495, in > apache_beam.coders.coder_impl.BytesCoderImpl.encode AssertionError: (10, > <class 'int'>) -- This message was sent by Atlassian Jira (v8.3.4#803005)