Hi,

My context: Java 8 , Beam 2.19.0

*TLDR*: How do I create a Beam-SQL UDF that returns a Map<String, String> ?

I have a library ( https://yauaa.basjes.nl ) that people would like to use
in combination with Beam-SQL.
The essence of this library is that a String goes in an a Key-Value set
(Map<String, String>) comes out.

I've already done this for Flink-SQL and there it was relatively easy: Just
implement the appropriate function and specify that the return type is a
Map<String, String>.
See
https://github.com/nielsbasjes/yauaa/blob/master/udfs/flink-table/src/main/java/nl/basjes/parse/useragent/flink/table/AnalyzeUseragentFunction.java#L88

Now for Beam-SQL I've been able to implement a function that returns a
String but if I try to return a Map<String, String> I get a nasty error.

If I use this smallest function possible in my SQL (Calcite)

public class FooMap implements SerializableFunction<String,
Map<String, String>> {
    @Override
    public Map<String, String> apply(String input) {
        final HashMap<String, String> hashMap = new HashMap<>();
        hashMap.put("Some", "Thing");
        return hashMap;
    }
}

I get this error


java.lang.NullPointerException: Null type

at
org.apache.beam.sdk.schemas.AutoValue_Schema_Field$Builder.setType(AutoValue_Schema_Field.java:84)
at org.apache.beam.sdk.schemas.Schema$Field.of(Schema.java:893)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:234)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:230)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at
java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:189)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:129)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:110)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:69)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:39)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:125)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:83)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
at
org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:261)
at
nl.basjes.parse.useragent.beam.TestFunctionReturnsMap.testUserAgentAnalysisSQL(TestFunctionReturnsMap.java:81)


If I have a function that returns a String it all works as expected


public class BarString implements SerializableFunction<String, String> {
    @Override
    public String apply(String input) {
        return new StringBuilder(input).reverse().toString();
    }
}

I already had a look at the Beam sourcecode and I have not yet been
able to figure out how I can explicitly tell the framework the Schema
that my function returns (like I did in the Flink implementation of
the same).

I found the built in functions that use a @UDF annotation that
specifies the returnType ... but as far as I can tell this returnType
is never actually used. So I cannot trace how it is done there.

https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinStringFunctions.java


So my question is really simple: How do I do this correctly?


NOTE: The code I run the tests with:

@Category(ValidatesRunner.class)
public class TestFunctionReturnsMap implements Serializable {

    private static final Logger LOG =
LoggerFactory.getLogger(TestFunctionReturnsMap.class);

    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();

    @Test
    @Category(NeedsRunner.class)
    public void testUserAgentAnalysisSQL() {

        // ============================================================
        // Create input PCollection<Row>
        Schema inputSchema = Schema
            .builder()
            .addStringField("bar")
            .build();

        PCollection<Row> input = pipeline
            .apply(Create.of(Arrays.asList("One", "Two", "Three")))
            .setCoder(StringUtf8Coder.of())
            .apply(ParDo.of(new DoFn<String, Row>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    c.output(Row
                        .withSchema(inputSchema)
                        .addValues(c.element())
                        .build());
                }
            })).setRowSchema(inputSchema);


        // ============================================================

        PCollection<Row> result =
            // This way we give a name to the input stream for use in the SQL
            PCollectionTuple.of("InputStream", input)
                // Apply the SQL with the UDFs we need.
                .apply("Execute SQL", SqlTransform
                    .query(
                        "SELECT" +
                        "   bar             AS bar" +
                        "  ,Bar(bar)        AS barbar " +
                        "  ,Foo(bar)        AS foobar " + // If I
remove this line the rest of the code works as expected.
                        "FROM InputStream")
                    .registerUdf("Foo",     new FooMap())
                    .registerUdf("Bar",     new BarString())
                );

        result.apply(ParDo.of(new RowPrinter()));

        pipeline.run().waitUntilFinish();
    }

    public static class RowPrinter extends DoFn<Row, Row> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            final Row row = c.element();
            LOG.info("ROW: {} --> {}", row, row.getSchema());
        }
    }

}












-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to