Thanks for reporting and finding the root cause! Last I heard Calcite was going to start a release shortly. We plan to update once the next version is out.
Andrew On Fri, Feb 7, 2020 at 4:38 AM Niels Basjes <ni...@basjes.nl> wrote: > Hi, > > I've done some serious debugging and traced the problem to what seems to > be the root cause. > The class > org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type. > JavaToSqlTypeConversionRules > does not have a mapping from java.util.Map to SqlTypeName.MAP. > > As a consequence the JavaType(MAP) is converted into SqlTypeName.OTHER which > breaks everything downstream. > > The nice thing is that this root cause seems to be fixed in > https://issues.apache.org/jira/browse/CALCITE-3429 > > https://github.com/apache/calcite/commit/ff44204dc2899e0c34e94f70c2e0c301170daca3 > > Which has not yet been released ... > > I created https://issues.apache.org/jira/browse/BEAM-9267 to track this. > > Niels Basjes > > > > On Fri, Feb 7, 2020 at 11:26 AM Niels Basjes <ni...@basjes.nl> wrote: > >> Hi, >> >> My context: Java 8 , Beam 2.19.0 >> >> *TLDR*: How do I create a Beam-SQL UDF that returns a Map<String, >> String> ? >> >> I have a library ( https://yauaa.basjes.nl ) that people would like to >> use in combination with Beam-SQL. >> The essence of this library is that a String goes in an a Key-Value set >> (Map<String, String>) comes out. >> >> I've already done this for Flink-SQL and there it was relatively easy: >> Just implement the appropriate function and specify that the return type is >> a Map<String, String>. >> See >> https://github.com/nielsbasjes/yauaa/blob/master/udfs/flink-table/src/main/java/nl/basjes/parse/useragent/flink/table/AnalyzeUseragentFunction.java#L88 >> >> Now for Beam-SQL I've been able to implement a function that returns a >> String but if I try to return a Map<String, String> I get a nasty error. >> >> If I use this smallest function possible in my SQL (Calcite) >> >> public class FooMap implements SerializableFunction<String, Map<String, >> String>> { >> @Override >> public Map<String, String> apply(String input) { >> final HashMap<String, String> hashMap = new HashMap<>(); >> hashMap.put("Some", "Thing"); >> return hashMap; >> } >> } >> >> I get this error >> >> >> java.lang.NullPointerException: Null type >> >> at >> org.apache.beam.sdk.schemas.AutoValue_Schema_Field$Builder.setType(AutoValue_Schema_Field.java:84) >> at org.apache.beam.sdk.schemas.Schema$Field.of(Schema.java:893) >> at >> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:234) >> at >> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:230) >> at >> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) >> at java.util.Iterator.forEachRemaining(Iterator.java:116) >> at >> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) >> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) >> at >> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) >> at >> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) >> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) >> at >> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:189) >> at >> org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:129) >> at >> org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:110) >> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539) >> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490) >> at >> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:69) >> at >> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:39) >> at >> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:125) >> at >> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:83) >> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539) >> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490) >> at >> org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:261) >> at >> nl.basjes.parse.useragent.beam.TestFunctionReturnsMap.testUserAgentAnalysisSQL(TestFunctionReturnsMap.java:81) >> >> >> If I have a function that returns a String it all works as expected >> >> >> public class BarString implements SerializableFunction<String, String> { >> @Override >> public String apply(String input) { >> return new StringBuilder(input).reverse().toString(); >> } >> } >> >> I already had a look at the Beam sourcecode and I have not yet been able to >> figure out how I can explicitly tell the framework the Schema that my >> function returns (like I did in the Flink implementation of the same). >> >> I found the built in functions that use a @UDF annotation that specifies the >> returnType ... but as far as I can tell this returnType is never actually >> used. So I cannot trace how it is done there. >> >> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinStringFunctions.java >> >> >> So my question is really simple: How do I do this correctly? >> >> >> NOTE: The code I run the tests with: >> >> @Category(ValidatesRunner.class) >> public class TestFunctionReturnsMap implements Serializable { >> >> private static final Logger LOG = >> LoggerFactory.getLogger(TestFunctionReturnsMap.class); >> >> @Rule >> public final transient TestPipeline pipeline = TestPipeline.create(); >> >> @Test >> @Category(NeedsRunner.class) >> public void testUserAgentAnalysisSQL() { >> >> // ============================================================ >> // Create input PCollection<Row> >> Schema inputSchema = Schema >> .builder() >> .addStringField("bar") >> .build(); >> >> PCollection<Row> input = pipeline >> .apply(Create.of(Arrays.asList("One", "Two", "Three"))) >> .setCoder(StringUtf8Coder.of()) >> .apply(ParDo.of(new DoFn<String, Row>() { >> @ProcessElement >> public void processElement(ProcessContext c) { >> c.output(Row >> .withSchema(inputSchema) >> .addValues(c.element()) >> .build()); >> } >> })).setRowSchema(inputSchema); >> >> >> // ============================================================ >> >> PCollection<Row> result = >> // This way we give a name to the input stream for use in the SQL >> PCollectionTuple.of("InputStream", input) >> // Apply the SQL with the UDFs we need. >> .apply("Execute SQL", SqlTransform >> .query( >> "SELECT" + >> " bar AS bar" + >> " ,Bar(bar) AS barbar " + >> " ,Foo(bar) AS foobar " + // If I remove >> this line the rest of the code works as expected. >> "FROM InputStream") >> .registerUdf("Foo", new FooMap()) >> .registerUdf("Bar", new BarString()) >> ); >> >> result.apply(ParDo.of(new RowPrinter())); >> >> pipeline.run().waitUntilFinish(); >> } >> >> public static class RowPrinter extends DoFn<Row, Row> { >> @ProcessElement >> public void processElement(ProcessContext c) { >> final Row row = c.element(); >> LOG.info("ROW: {} --> {}", row, row.getSchema()); >> } >> } >> >> } >> >> >> >> >> >> >> >> >> >> >> >> >> -- >> Best regards / Met vriendelijke groeten, >> >> Niels Basjes >> > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes >