Thanks for reporting and finding the root cause! Last I heard Calcite was
going to start a release shortly. We plan to update once the next version
is out.


On Fri, Feb 7, 2020 at 4:38 AM Niels Basjes <> wrote:

> Hi,
> I've done some serious debugging and traced the problem to what seems to
> be the root cause.
> The class
> JavaToSqlTypeConversionRules
> does not have a mapping from java.util.Map to SqlTypeName.MAP.
> As a consequence the JavaType(MAP) is converted into SqlTypeName.OTHER which 
> breaks everything downstream.
> The nice thing is that this root cause seems to be fixed in 
> Which has not yet been released ...
> I created to track this.
> Niels Basjes
> On Fri, Feb 7, 2020 at 11:26 AM Niels Basjes <> wrote:
>> Hi,
>> My context: Java 8 , Beam 2.19.0
>> *TLDR*: How do I create a Beam-SQL UDF that returns a Map<String,
>> String> ?
>> I have a library ( ) that people would like to
>> use in combination with Beam-SQL.
>> The essence of this library is that a String goes in an a Key-Value set
>> (Map<String, String>) comes out.
>> I've already done this for Flink-SQL and there it was relatively easy:
>> Just implement the appropriate function and specify that the return type is
>> a Map<String, String>.
>> See
>> Now for Beam-SQL I've been able to implement a function that returns a
>> String but if I try to return a Map<String, String> I get a nasty error.
>> If I use this smallest function possible in my SQL (Calcite)
>> public class FooMap implements SerializableFunction<String, Map<String, 
>> String>> {
>>     @Override
>>     public Map<String, String> apply(String input) {
>>         final HashMap<String, String> hashMap = new HashMap<>();
>>         hashMap.put("Some", "Thing");
>>         return hashMap;
>>     }
>> }
>> I get this error
>> java.lang.NullPointerException: Null type
>> at
>> org.apache.beam.sdk.schemas.AutoValue_Schema_Field$Builder.setType(
>> at org.apache.beam.sdk.schemas.Schema$Field.of(
>> at
>> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(
>> at
>> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(
>> at
>> at java.util.Iterator.forEachRemaining(
>> at
>> java.util.Spliterators$IteratorSpliterator.forEachRemaining(
>> at
>> at
>> at
>> at
>> at
>> at
>> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(
>> at
>> org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(
>> at
>> org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(
>> at org.apache.beam.sdk.Pipeline.applyInternal(
>> at org.apache.beam.sdk.Pipeline.applyTransform(
>> at
>> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(
>> at
>> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(
>> at
>> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(
>> at
>> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(
>> at org.apache.beam.sdk.Pipeline.applyInternal(
>> at org.apache.beam.sdk.Pipeline.applyTransform(
>> at
>> org.apache.beam.sdk.values.PCollectionTuple.apply(
>> at
>> nl.basjes.parse.useragent.beam.TestFunctionReturnsMap.testUserAgentAnalysisSQL(
>> If I have a function that returns a String it all works as expected
>> public class BarString implements SerializableFunction<String, String> {
>>     @Override
>>     public String apply(String input) {
>>         return new StringBuilder(input).reverse().toString();
>>     }
>> }
>> I already had a look at the Beam sourcecode and I have not yet been able to 
>> figure out how I can explicitly tell the framework the Schema that my 
>> function returns (like I did in the Flink implementation of the same).
>> I found the built in functions that use a @UDF annotation that specifies the 
>> returnType ... but as far as I can tell this returnType is never actually 
>> used. So I cannot trace how it is done there.
>> So my question is really simple: How do I do this correctly?
>> NOTE: The code I run the tests with:
>> @Category(ValidatesRunner.class)
>> public class TestFunctionReturnsMap implements Serializable {
>>     private static final Logger LOG = 
>> LoggerFactory.getLogger(TestFunctionReturnsMap.class);
>>     @Rule
>>     public final transient TestPipeline pipeline = TestPipeline.create();
>>     @Test
>>     @Category(NeedsRunner.class)
>>     public void testUserAgentAnalysisSQL() {
>>         // ============================================================
>>         // Create input PCollection<Row>
>>         Schema inputSchema = Schema
>>             .builder()
>>             .addStringField("bar")
>>             .build();
>>         PCollection<Row> input = pipeline
>>             .apply(Create.of(Arrays.asList("One", "Two", "Three")))
>>             .setCoder(StringUtf8Coder.of())
>>             .apply(ParDo.of(new DoFn<String, Row>() {
>>                 @ProcessElement
>>                 public void processElement(ProcessContext c) {
>>                     c.output(Row
>>                         .withSchema(inputSchema)
>>                         .addValues(c.element())
>>                         .build());
>>                 }
>>             })).setRowSchema(inputSchema);
>>         // ============================================================
>>         PCollection<Row> result =
>>             // This way we give a name to the input stream for use in the SQL
>>             PCollectionTuple.of("InputStream", input)
>>                 // Apply the SQL with the UDFs we need.
>>                 .apply("Execute SQL", SqlTransform
>>                     .query(
>>                         "SELECT" +
>>                         "   bar             AS bar" +
>>                         "  ,Bar(bar)        AS barbar " +
>>                         "  ,Foo(bar)        AS foobar " + // If I remove 
>> this line the rest of the code works as expected.
>>                         "FROM InputStream")
>>                     .registerUdf("Foo",     new FooMap())
>>                     .registerUdf("Bar",     new BarString())
>>                 );
>>         result.apply(ParDo.of(new RowPrinter()));
>>     }
>>     public static class RowPrinter extends DoFn<Row, Row> {
>>         @ProcessElement
>>         public void processElement(ProcessContext c) {
>>             final Row row = c.element();
>>   "ROW: {} --> {}", row, row.getSchema());
>>         }
>>     }
>> }
>> --
>> Best regards / Met vriendelijke groeten,
>> Niels Basjes
> --
> Best regards / Met vriendelijke groeten,
> Niels Basjes

Reply via email to