Hi, I've done some serious debugging and traced the problem to what seems to be the root cause. The class org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type. JavaToSqlTypeConversionRules does not have a mapping from java.util.Map to SqlTypeName.MAP.
As a consequence the JavaType(MAP) is converted into SqlTypeName.OTHER which breaks everything downstream. The nice thing is that this root cause seems to be fixed in https://issues.apache.org/jira/browse/CALCITE-3429 https://github.com/apache/calcite/commit/ff44204dc2899e0c34e94f70c2e0c301170daca3 Which has not yet been released ... I created https://issues.apache.org/jira/browse/BEAM-9267 to track this. Niels Basjes On Fri, Feb 7, 2020 at 11:26 AM Niels Basjes <ni...@basjes.nl> wrote: > Hi, > > My context: Java 8 , Beam 2.19.0 > > *TLDR*: How do I create a Beam-SQL UDF that returns a Map<String, String> > ? > > I have a library ( https://yauaa.basjes.nl ) that people would like to > use in combination with Beam-SQL. > The essence of this library is that a String goes in an a Key-Value set > (Map<String, String>) comes out. > > I've already done this for Flink-SQL and there it was relatively easy: > Just implement the appropriate function and specify that the return type is > a Map<String, String>. > See > https://github.com/nielsbasjes/yauaa/blob/master/udfs/flink-table/src/main/java/nl/basjes/parse/useragent/flink/table/AnalyzeUseragentFunction.java#L88 > > Now for Beam-SQL I've been able to implement a function that returns a > String but if I try to return a Map<String, String> I get a nasty error. > > If I use this smallest function possible in my SQL (Calcite) > > public class FooMap implements SerializableFunction<String, Map<String, > String>> { > @Override > public Map<String, String> apply(String input) { > final HashMap<String, String> hashMap = new HashMap<>(); > hashMap.put("Some", "Thing"); > return hashMap; > } > } > > I get this error > > > java.lang.NullPointerException: Null type > > at > org.apache.beam.sdk.schemas.AutoValue_Schema_Field$Builder.setType(AutoValue_Schema_Field.java:84) > at org.apache.beam.sdk.schemas.Schema$Field.of(Schema.java:893) > at > org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:234) > at > org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:230) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at java.util.Iterator.forEachRemaining(Iterator.java:116) > at > java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:189) > at > org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:129) > at > org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:110) > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539) > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490) > at > org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:69) > at > org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:39) > at > org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:125) > at > org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:83) > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539) > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490) > at > org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:261) > at > nl.basjes.parse.useragent.beam.TestFunctionReturnsMap.testUserAgentAnalysisSQL(TestFunctionReturnsMap.java:81) > > > If I have a function that returns a String it all works as expected > > > public class BarString implements SerializableFunction<String, String> { > @Override > public String apply(String input) { > return new StringBuilder(input).reverse().toString(); > } > } > > I already had a look at the Beam sourcecode and I have not yet been able to > figure out how I can explicitly tell the framework the Schema that my > function returns (like I did in the Flink implementation of the same). > > I found the built in functions that use a @UDF annotation that specifies the > returnType ... but as far as I can tell this returnType is never actually > used. So I cannot trace how it is done there. > > https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinStringFunctions.java > > > So my question is really simple: How do I do this correctly? > > > NOTE: The code I run the tests with: > > @Category(ValidatesRunner.class) > public class TestFunctionReturnsMap implements Serializable { > > private static final Logger LOG = > LoggerFactory.getLogger(TestFunctionReturnsMap.class); > > @Rule > public final transient TestPipeline pipeline = TestPipeline.create(); > > @Test > @Category(NeedsRunner.class) > public void testUserAgentAnalysisSQL() { > > // ============================================================ > // Create input PCollection<Row> > Schema inputSchema = Schema > .builder() > .addStringField("bar") > .build(); > > PCollection<Row> input = pipeline > .apply(Create.of(Arrays.asList("One", "Two", "Three"))) > .setCoder(StringUtf8Coder.of()) > .apply(ParDo.of(new DoFn<String, Row>() { > @ProcessElement > public void processElement(ProcessContext c) { > c.output(Row > .withSchema(inputSchema) > .addValues(c.element()) > .build()); > } > })).setRowSchema(inputSchema); > > > // ============================================================ > > PCollection<Row> result = > // This way we give a name to the input stream for use in the SQL > PCollectionTuple.of("InputStream", input) > // Apply the SQL with the UDFs we need. > .apply("Execute SQL", SqlTransform > .query( > "SELECT" + > " bar AS bar" + > " ,Bar(bar) AS barbar " + > " ,Foo(bar) AS foobar " + // If I remove this > line the rest of the code works as expected. > "FROM InputStream") > .registerUdf("Foo", new FooMap()) > .registerUdf("Bar", new BarString()) > ); > > result.apply(ParDo.of(new RowPrinter())); > > pipeline.run().waitUntilFinish(); > } > > public static class RowPrinter extends DoFn<Row, Row> { > @ProcessElement > public void processElement(ProcessContext c) { > final Row row = c.element(); > LOG.info("ROW: {} --> {}", row, row.getSchema()); > } > } > > } > > > > > > > > > > > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes > -- Best regards / Met vriendelijke groeten, Niels Basjes