[ 
https://issues.apache.org/jira/browse/BEAM-10996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208270#comment-17208270
 ] 

Ning Kang commented on BEAM-10996:
----------------------------------

First, I have to clarify this is not related to the notebook, the pipeline 
fails no matter where it runs.
Second, there are 2 problems with the pipeline:
When using `beam.FlatMap`, the 3 serialized strings are flattened into dozens 
of integers. Thus you have the assertion error because the elements associated 
with the PCollection are now of type `int`. You should use `beam.Map` instead 
of `beam.FlatMap`. If this is intended, you have to convert the int elements 
into strings such as appending another transform `beam.Map(lambda x: 
str(x))`.The SerializeToString is not decodable by Beam. You can enclose it 
with `return base64.b64encode(tfexample.SerializeToString()).decode('utf-8')`.

> AssertionError: (10, <class 'int'>) when writing TF Records
> -----------------------------------------------------------
>
>                 Key: BEAM-10996
>                 URL: https://issues.apache.org/jira/browse/BEAM-10996
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.22.0
>            Reporter: Valliappa Lakshmanan
>            Assignee: Ning Kang
>            Priority: P1
>         Attachments: repro.py
>
>
> This code snippet:
>  
> def create_tfrecord(x):
>   size = np.array([2.0, 3.0])
>   tfexample = tf.train.Example( 
>      features=tf.train.Features(
>         feature={
>            'size': tf.train.Feature(float_list=tf.train.FloatList(value=size))
>    }))
>   return tfexample.SerializeToString()
>  
> ...
> beam.FlatMap(lambda x: create_tfrecord(x))
> ...
>  
> throws this error:
>  
> Traceback (most recent call last): File "apache_beam/runners/common.py", line 
> 961, in apache_beam.runners.common.DoFnRunner.process File 
> "apache_beam/runners/common.py", line 726, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process File 
> "apache_beam/runners/common.py", line 814, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File 
> "/opt/conda/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1061, 
> in process self.writer.write(element) File 
> "/opt/conda/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", 
> line 420, in write self.sink.write_record(self.temp_handle, value) File 
> "/opt/conda/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", 
> line 146, in write_record self.write_encoded_record(file_handle, 
> self.coder.encode(value)) File 
> "/opt/conda/lib/python3.7/site-packages/apache_beam/coders/coders.py", line 
> 463, in encode return self.get_impl().encode(value) File 
> "apache_beam/coders/coder_impl.py", line 494, in 
> apache_beam.coders.coder_impl.BytesCoderImpl.encode File 
> "apache_beam/coders/coder_impl.py", line 495, in 
> apache_beam.coders.coder_impl.BytesCoderImpl.encode AssertionError: (10, 
> <class 'int'>)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to