omarismail94 commented on a change in pull request #11754: URL: https://github.com/apache/beam/pull/11754#discussion_r427678033
########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ########## @@ -66,38 +68,47 @@ public static void main(String[] args) { inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); // print the output record of case 1; - outputStream.apply( - "log_result", - MapElements.via( - new SimpleFunction<Row, Row>() { - @Override - public Row apply(Row input) { - // expect output: - // PCOLLECTION: [3, row, 3.0] - // PCOLLECTION: [2, row, 2.0] - System.out.println("PCOLLECTION: " + input.getValues()); - return input; - } - })); + outputStream + .apply( + "log_result", + MapElements.via( + new SimpleFunction<Row, Row>() { + @Override + public Row apply(Row input) { + // expect output: + // PCOLLECTION: [3, row, 3.0] + // PCOLLECTION: [2, row, 2.0] + System.out.println("PCOLLECTION: " + input.getValues()); + return input; + } + })) + .setCoder(RowCoder.of(type)); // Case 2. run the query with SqlTransform.query over result PCollection of case 1. PCollection<Row> outputStream2 = PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream) .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2")); // print the output record of case 2; - outputStream2.apply( - "log_result", - MapElements.via( - new SimpleFunction<Row, Row>() { - @Override - public Row apply(Row input) { - // expect output: - // CASE1_RESULT: [row, 5.0] - System.out.println("CASE1_RESULT: " + input.getValues()); - return input; - } - })); + outputStream2 + .apply( + "log_result", + MapElements.via( + new SimpleFunction<Row, Row>() { + @Override + public Row apply(Row input) { + // expect output: + // CASE1_RESULT: [row, 5.0] + System.out.println("CASE1_RESULT: " + input.getValues()); + return input; + } + })) + .setCoder( + RowCoder.of( + Schema.builder() + .addStringField("stringField") + .addDoubleField("doubleField") + .build())); Review comment: I tried `setRowSchema(type)` and it failed with `: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer`. I think it is inferring the schema as 3 fields, but the result only returns fields, and that's why it throws the error ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org