Hi, My context: Java 8 , Beam 2.19.0
*TLDR*: How do I create a Beam-SQL UDF that returns a Map<String, String> ? I have a library ( https://yauaa.basjes.nl ) that people would like to use in combination with Beam-SQL. The essence of this library is that a String goes in an a Key-Value set (Map<String, String>) comes out. I've already done this for Flink-SQL and there it was relatively easy: Just implement the appropriate function and specify that the return type is a Map<String, String>. See https://github.com/nielsbasjes/yauaa/blob/master/udfs/flink-table/src/main/java/nl/basjes/parse/useragent/flink/table/AnalyzeUseragentFunction.java#L88 Now for Beam-SQL I've been able to implement a function that returns a String but if I try to return a Map<String, String> I get a nasty error. If I use this smallest function possible in my SQL (Calcite) public class FooMap implements SerializableFunction<String, Map<String, String>> { @Override public Map<String, String> apply(String input) { final HashMap<String, String> hashMap = new HashMap<>(); hashMap.put("Some", "Thing"); return hashMap; } } I get this error java.lang.NullPointerException: Null type at org.apache.beam.sdk.schemas.AutoValue_Schema_Field$Builder.setType(AutoValue_Schema_Field.java:84) at org.apache.beam.sdk.schemas.Schema$Field.of(Schema.java:893) at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:234) at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:230) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.Iterator.forEachRemaining(Iterator.java:116) at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:189) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:129) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:110) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:69) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:39) at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:125) at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:83) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490) at org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:261) at nl.basjes.parse.useragent.beam.TestFunctionReturnsMap.testUserAgentAnalysisSQL(TestFunctionReturnsMap.java:81) If I have a function that returns a String it all works as expected public class BarString implements SerializableFunction<String, String> { @Override public String apply(String input) { return new StringBuilder(input).reverse().toString(); } } I already had a look at the Beam sourcecode and I have not yet been able to figure out how I can explicitly tell the framework the Schema that my function returns (like I did in the Flink implementation of the same). I found the built in functions that use a @UDF annotation that specifies the returnType ... but as far as I can tell this returnType is never actually used. So I cannot trace how it is done there. https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinStringFunctions.java So my question is really simple: How do I do this correctly? NOTE: The code I run the tests with: @Category(ValidatesRunner.class) public class TestFunctionReturnsMap implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(TestFunctionReturnsMap.class); @Rule public final transient TestPipeline pipeline = TestPipeline.create(); @Test @Category(NeedsRunner.class) public void testUserAgentAnalysisSQL() { // ============================================================ // Create input PCollection<Row> Schema inputSchema = Schema .builder() .addStringField("bar") .build(); PCollection<Row> input = pipeline .apply(Create.of(Arrays.asList("One", "Two", "Three"))) .setCoder(StringUtf8Coder.of()) .apply(ParDo.of(new DoFn<String, Row>() { @ProcessElement public void processElement(ProcessContext c) { c.output(Row .withSchema(inputSchema) .addValues(c.element()) .build()); } })).setRowSchema(inputSchema); // ============================================================ PCollection<Row> result = // This way we give a name to the input stream for use in the SQL PCollectionTuple.of("InputStream", input) // Apply the SQL with the UDFs we need. .apply("Execute SQL", SqlTransform .query( "SELECT" + " bar AS bar" + " ,Bar(bar) AS barbar " + " ,Foo(bar) AS foobar " + // If I remove this line the rest of the code works as expected. "FROM InputStream") .registerUdf("Foo", new FooMap()) .registerUdf("Bar", new BarString()) ); result.apply(ParDo.of(new RowPrinter())); pipeline.run().waitUntilFinish(); } public static class RowPrinter extends DoFn<Row, Row> { @ProcessElement public void processElement(ProcessContext c) { final Row row = c.element(); LOG.info("ROW: {} --> {}", row, row.getSchema()); } } } -- Best regards / Met vriendelijke groeten, Niels Basjes