
I've done some serious debugging and traced the problem to what seems to be
the root cause.
The class
does not have a mapping from java.util.Map to SqlTypeName.MAP.

As a consequence the JavaType(MAP) is converted into SqlTypeName.OTHER
which breaks everything downstream.

The nice thing is that this root cause seems to be fixed in


Which has not yet been released ...

I created https://issues.apache.org/jira/browse/BEAM-9267 to track this.

Niels Basjes

On Fri, Feb 7, 2020 at 11:26 AM Niels Basjes <ni...@basjes.nl> wrote:

> Hi,
> My context: Java 8 , Beam 2.19.0
> *TLDR*: How do I create a Beam-SQL UDF that returns a Map<String, String>
> ?
> I have a library ( https://yauaa.basjes.nl ) that people would like to
> use in combination with Beam-SQL.
> The essence of this library is that a String goes in an a Key-Value set
> (Map<String, String>) comes out.
> I've already done this for Flink-SQL and there it was relatively easy:
> Just implement the appropriate function and specify that the return type is
> a Map<String, String>.
> See
> https://github.com/nielsbasjes/yauaa/blob/master/udfs/flink-table/src/main/java/nl/basjes/parse/useragent/flink/table/AnalyzeUseragentFunction.java#L88
> Now for Beam-SQL I've been able to implement a function that returns a
> String but if I try to return a Map<String, String> I get a nasty error.
> If I use this smallest function possible in my SQL (Calcite)
> public class FooMap implements SerializableFunction<String, Map<String, 
> String>> {
>     @Override
>     public Map<String, String> apply(String input) {
>         final HashMap<String, String> hashMap = new HashMap<>();
>         hashMap.put("Some", "Thing");
>         return hashMap;
>     }
> }
> I get this error
> java.lang.NullPointerException: Null type
> at
> org.apache.beam.sdk.schemas.AutoValue_Schema_Field$Builder.setType(AutoValue_Schema_Field.java:84)
> at org.apache.beam.sdk.schemas.Schema$Field.of(Schema.java:893)
> at
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:234)
> at
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:230)
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at java.util.Iterator.forEachRemaining(Iterator.java:116)
> at
> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
> at
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:189)
> at
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:129)
> at
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:110)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
> at
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:69)
> at
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:39)
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:125)
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:83)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
> at
> org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:261)
> at
> nl.basjes.parse.useragent.beam.TestFunctionReturnsMap.testUserAgentAnalysisSQL(TestFunctionReturnsMap.java:81)
> If I have a function that returns a String it all works as expected
> public class BarString implements SerializableFunction<String, String> {
>     @Override
>     public String apply(String input) {
>         return new StringBuilder(input).reverse().toString();
>     }
> }
> I already had a look at the Beam sourcecode and I have not yet been able to 
> figure out how I can explicitly tell the framework the Schema that my 
> function returns (like I did in the Flink implementation of the same).
> I found the built in functions that use a @UDF annotation that specifies the 
> returnType ... but as far as I can tell this returnType is never actually 
> used. So I cannot trace how it is done there.
> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinStringFunctions.java
> So my question is really simple: How do I do this correctly?
> NOTE: The code I run the tests with:
> @Category(ValidatesRunner.class)
> public class TestFunctionReturnsMap implements Serializable {
>     private static final Logger LOG = 
> LoggerFactory.getLogger(TestFunctionReturnsMap.class);
>     @Rule
>     public final transient TestPipeline pipeline = TestPipeline.create();
>     @Test
>     @Category(NeedsRunner.class)
>     public void testUserAgentAnalysisSQL() {
>         // ============================================================
>         // Create input PCollection<Row>
>         Schema inputSchema = Schema
>             .builder()
>             .addStringField("bar")
>             .build();
>         PCollection<Row> input = pipeline
>             .apply(Create.of(Arrays.asList("One", "Two", "Three")))
>             .setCoder(StringUtf8Coder.of())
>             .apply(ParDo.of(new DoFn<String, Row>() {
>                 @ProcessElement
>                 public void processElement(ProcessContext c) {
>                     c.output(Row
>                         .withSchema(inputSchema)
>                         .addValues(c.element())
>                         .build());
>                 }
>             })).setRowSchema(inputSchema);
>         // ============================================================
>         PCollection<Row> result =
>             // This way we give a name to the input stream for use in the SQL
>             PCollectionTuple.of("InputStream", input)
>                 // Apply the SQL with the UDFs we need.
>                 .apply("Execute SQL", SqlTransform
>                     .query(
>                         "SELECT" +
>                         "   bar             AS bar" +
>                         "  ,Bar(bar)        AS barbar " +
>                         "  ,Foo(bar)        AS foobar " + // If I remove this 
> line the rest of the code works as expected.
>                         "FROM InputStream")
>                     .registerUdf("Foo",     new FooMap())
>                     .registerUdf("Bar",     new BarString())
>                 );
>         result.apply(ParDo.of(new RowPrinter()));
>         pipeline.run().waitUntilFinish();
>     }
>     public static class RowPrinter extends DoFn<Row, Row> {
>         @ProcessElement
>         public void processElement(ProcessContext c) {
>             final Row row = c.element();
>             LOG.info("ROW: {} --> {}", row, row.getSchema());
>         }
>     }
> }
> --
> Best regards / Met vriendelijke groeten,
> Niels Basjes

Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to