[ 
https://issues.apache.org/jira/browse/BEAM-9814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17108415#comment-17108415
 ] 

Brendan Stennett edited comment on BEAM-9814 at 5/15/20, 4:02 PM:
------------------------------------------------------------------

[~reuvenlax] this appears even in simple examples without manipulating the 
RowCoder

 
{code:java}
Pipeline p = Pipeline.create(options);

Schema schema = Schema.builder()
        .addField("value", Schema.FieldType.STRING)
        .build();

p.apply(Create.of("row1", "row2", "row3"))
        .apply(ParDo.of(new DoFn<String, Row>() {
            @ProcessElement
            public void processElement(@Element String input, 
OutputReceiver<Row> out) {
                Row row = Row.withSchema(schema)
                        .addValue(input)
                        .build();

                out.output(row);
            }
        }))
        .apply(ParDo.of(new DoFn<Row, String>() {
            @ProcessElement
            public void processElement(@Element Row row, OutputReceiver<String> 
out) {
                out.output(row.getString("value"));
            }
        }))
        .apply("Write", TextIO.write().to(options.getOutput()));
{code}

Edit: This exact example works in 2.20.0 but not in 2.21.0-SNAPSHOT or 
2.22.0-SNAPSHOT


was (Author: brendan6):
[~reuvenlax] this appears even in simple examples without manipulating the 
RowCoder

 

{code:language=java}
Pipeline p = Pipeline.create(options);

Schema schema = Schema.builder()
        .addField("value", Schema.FieldType.STRING)
        .build();

p.apply(Create.of("row1", "row2", "row3"))
        .apply(ParDo.of(new DoFn<String, Row>() {
            @ProcessElement
            public void processElement(@Element String input, 
OutputReceiver<Row> out) {
                Row row = Row.withSchema(schema)
                        .addValue(input)
                        .build();

                out.output(row);
            }
        }))
        .apply(ParDo.of(new DoFn<Row, String>() {
            @ProcessElement
            public void processElement(@Element Row row, OutputReceiver<String> 
out) {
                out.output(row.getString("value"));
            }
        }))
        .apply("Write", TextIO.write().to(options.getOutput()));
{code}

> Error occurred when transforming from row to a new row without setCoder
> -----------------------------------------------------------------------
>
>                 Key: BEAM-9814
>                 URL: https://issues.apache.org/jira/browse/BEAM-9814
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>    Affects Versions: 2.19.0
>            Reporter: Ruixue Liao
>            Assignee: Reuven Lax
>            Priority: Major
>
> The output row from transform function uses the input row schema to verify 
> which causes error. Ex:
> {code}
>         .apply(MapElements.via(
>                 new SimpleFunction<Row, Row>() \{
>                     @Override
>                     public Row apply(Row line) {
>                         return Row.withSchema(newSchema).addValues("a", 1, 
> "b").build();
>                     }
>                 }));
> {code}
> Got error when the output row schema is not the same as the input row.
> Need to add {{.setCoder(RowCoder.of(newSchema))}} after the transform 
> function to make it work.
> Related link: 
> [https://stackoverflow.com/questions/61236972/beam-sql-udf-to-split-one-column-into-multiple-columns]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to