For some background, I am upgrading from Flink v1.9 to v1.11. So what I am
about to describe is our implementation on v1.9, which worked. I am trying
to achieve the same functionality on v1.11.

I have a DataStream whose type is an avro generated POJO, which contains a
field *UrlParameters* that is of type *Map<String, String>*. I register
this stream as a view so I can perform SQL queries on it. One of the
queries contains the UDF I have previously posted. It appears that in the
conversion to a view, the type of *UrlParameters* is being converted
into *RAW('java.util.Map',
?)*.


*Code on v1.9*

DataStream pings = // a Kafka stream source deserialized into an avro
generated POJO
tableEnvironment.registerDataStream("myTable", pings);
table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters,
'some_key') FROM myTable");
// tablesinks...


*The produced type of my deserializer is:*

@Override
public TypeInformation<Ping> getProducedType() {
    // Ping.class is an avro generated POJO
    return TypeInformation.of(Ping.class);
}

*Scalar UDF MAP_VALUE:*

public static String eval(final Map<String, String> map, final String key) {
    return map.get(key);
}


I an using a UDF to access fields in the *UrlParameters* map because if I
try to access them directly in the SQL (i.e. `*UrlParameters['some_key']*`),
I get the below exception. This stackoverflow[1] had suggested the UDF as a
work around.

Caused by: org.apache.flink.table.api.TableException: Type is not
supported: ANY
at
org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalType(FlinkTypeFactory.scala:551)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:478)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$1(ExprCodeGenerator.scala:490)


This above implementation worked successfully on v1.9. We use a stream
source instead of a table source b/c we do other non-SQL type things with
the stream.


*Code on v1.11*

The following is the implementation on v1.11 which does not work. I was
using the Old Planner on v1.9 but have switched to the Blink Planner on
v1.11, in case that has any relevance here.


DataStream pings = // a Kafka stream source deserialized into an avro
generated POJO object
tableEnvironment.createTemporaryView("myTable", pings);
table = tableEnvironment.sqlQuery("SELECT MAP_VALUE(UrlParameters,
'some_key') FROM myTable");
// tablesinks...


The UDF referenced above produced the below error. So I assumed adding
DataTypeHints was the way to solve it but I was unable to get that to work.
That is what prompted the initial email to the ML.

Caused by: org.apache.flink.table.api.ValidationException: Invalid input
arguments. Expected signatures are:
MAP_VALUE(map => MAP<STRING, STRING>, key => STRING)
at
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
... 50 more
Caused by: org.apache.flink.table.api.ValidationException: Invalid argument
type at position 0. Data type MAP<STRING, STRING> expected but
RAW('java.util.Map', ?) passed.
at
org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
at
org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
... 51 more


I can try creating a concrete reproducible example if this explanation
isn't enough though its quite a bit with the avro POJO and custom
deserializer.


Thanks,

Steve


[1]
https://stackoverflow.com/questions/45621542/does-flink-sql-support-java-map-types

>

Reply via email to