Hi,

I've done some serious debugging and traced the problem to what seems to be
the root cause.
The class
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.
JavaToSqlTypeConversionRules
does not have a mapping from java.util.Map to SqlTypeName.MAP.

As a consequence the JavaType(MAP) is converted into SqlTypeName.OTHER
which breaks everything downstream.

The nice thing is that this root cause seems to be fixed in
https://issues.apache.org/jira/browse/CALCITE-3429

https://github.com/apache/calcite/commit/ff44204dc2899e0c34e94f70c2e0c301170daca3

Which has not yet been released ...

I created https://issues.apache.org/jira/browse/BEAM-9267 to track this.

Niels Basjes



On Fri, Feb 7, 2020 at 11:26 AM Niels Basjes <ni...@basjes.nl> wrote:

> Hi,
>
> My context: Java 8 , Beam 2.19.0
>
> *TLDR*: How do I create a Beam-SQL UDF that returns a Map<String, String>
> ?
>
> I have a library ( https://yauaa.basjes.nl ) that people would like to
> use in combination with Beam-SQL.
> The essence of this library is that a String goes in an a Key-Value set
> (Map<String, String>) comes out.
>
> I've already done this for Flink-SQL and there it was relatively easy:
> Just implement the appropriate function and specify that the return type is
> a Map<String, String>.
> See
> https://github.com/nielsbasjes/yauaa/blob/master/udfs/flink-table/src/main/java/nl/basjes/parse/useragent/flink/table/AnalyzeUseragentFunction.java#L88
>
> Now for Beam-SQL I've been able to implement a function that returns a
> String but if I try to return a Map<String, String> I get a nasty error.
>
> If I use this smallest function possible in my SQL (Calcite)
>
> public class FooMap implements SerializableFunction<String, Map<String, 
> String>> {
>     @Override
>     public Map<String, String> apply(String input) {
>         final HashMap<String, String> hashMap = new HashMap<>();
>         hashMap.put("Some", "Thing");
>         return hashMap;
>     }
> }
>
> I get this error
>
>
> java.lang.NullPointerException: Null type
>
> at
> org.apache.beam.sdk.schemas.AutoValue_Schema_Field$Builder.setType(AutoValue_Schema_Field.java:84)
> at org.apache.beam.sdk.schemas.Schema$Field.of(Schema.java:893)
> at
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:234)
> at
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:230)
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at java.util.Iterator.forEachRemaining(Iterator.java:116)
> at
> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
> at
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:189)
> at
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:129)
> at
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:110)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
> at
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:69)
> at
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:39)
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:125)
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:83)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
> at
> org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:261)
> at
> nl.basjes.parse.useragent.beam.TestFunctionReturnsMap.testUserAgentAnalysisSQL(TestFunctionReturnsMap.java:81)
>
>
> If I have a function that returns a String it all works as expected
>
>
> public class BarString implements SerializableFunction<String, String> {
>     @Override
>     public String apply(String input) {
>         return new StringBuilder(input).reverse().toString();
>     }
> }
>
> I already had a look at the Beam sourcecode and I have not yet been able to 
> figure out how I can explicitly tell the framework the Schema that my 
> function returns (like I did in the Flink implementation of the same).
>
> I found the built in functions that use a @UDF annotation that specifies the 
> returnType ... but as far as I can tell this returnType is never actually 
> used. So I cannot trace how it is done there.
>
> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinStringFunctions.java
>
>
> So my question is really simple: How do I do this correctly?
>
>
> NOTE: The code I run the tests with:
>
> @Category(ValidatesRunner.class)
> public class TestFunctionReturnsMap implements Serializable {
>
>     private static final Logger LOG = 
> LoggerFactory.getLogger(TestFunctionReturnsMap.class);
>
>     @Rule
>     public final transient TestPipeline pipeline = TestPipeline.create();
>
>     @Test
>     @Category(NeedsRunner.class)
>     public void testUserAgentAnalysisSQL() {
>
>         // ============================================================
>         // Create input PCollection<Row>
>         Schema inputSchema = Schema
>             .builder()
>             .addStringField("bar")
>             .build();
>
>         PCollection<Row> input = pipeline
>             .apply(Create.of(Arrays.asList("One", "Two", "Three")))
>             .setCoder(StringUtf8Coder.of())
>             .apply(ParDo.of(new DoFn<String, Row>() {
>                 @ProcessElement
>                 public void processElement(ProcessContext c) {
>                     c.output(Row
>                         .withSchema(inputSchema)
>                         .addValues(c.element())
>                         .build());
>                 }
>             })).setRowSchema(inputSchema);
>
>
>         // ============================================================
>
>         PCollection<Row> result =
>             // This way we give a name to the input stream for use in the SQL
>             PCollectionTuple.of("InputStream", input)
>                 // Apply the SQL with the UDFs we need.
>                 .apply("Execute SQL", SqlTransform
>                     .query(
>                         "SELECT" +
>                         "   bar             AS bar" +
>                         "  ,Bar(bar)        AS barbar " +
>                         "  ,Foo(bar)        AS foobar " + // If I remove this 
> line the rest of the code works as expected.
>                         "FROM InputStream")
>                     .registerUdf("Foo",     new FooMap())
>                     .registerUdf("Bar",     new BarString())
>                 );
>
>         result.apply(ParDo.of(new RowPrinter()));
>
>         pipeline.run().waitUntilFinish();
>     }
>
>     public static class RowPrinter extends DoFn<Row, Row> {
>         @ProcessElement
>         public void processElement(ProcessContext c) {
>             final Row row = c.element();
>             LOG.info("ROW: {} --> {}", row, row.getSchema());
>         }
>     }
>
> }
>
>
>
>
>
>
>
>
>
>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to