Hi!

You can call streamSource.processRecord to change the CharSequence to a
String, then change the stream to a table.

Peter Schrott <peter.schrot...@googlemail.com> 于2021年10月18日周一 下午8:40写道:

> Hi there,
>
> I have a Kafka topic where the schema of its values is defined by the
> "MyRecord" record in the following Avro IDL and registered to the Confluent
> Schema Registry:
>
> @namespace("org.example")
> protocol MyProtocol {
>    record MyRecord {
>       string text;
>    }
> }
>
> The topic is consumed with a KafkaSource and then then passed into
> StreamTableEnvironment. On the temporary view I want to run SQL queries.
>
> But the following exception is thrown on startup of the job:
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 0, column 0 to line 1, column 58: Cannot 
> apply 'LIKE' to arguments of type 'LIKE(<RAW('ORG.APACHE.AVRO.UTIL.UTF8', 
> '...')>, <CHAR(6)>)'. Supported form(s): 'LIKE(<STRING>, <STRING>, <STRING>)'
>       at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org 
> <http://org.apache.flink.table.planner.calcite.flinkplannerimpl.org/>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156)
>       at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
>       at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205)
>       at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
>       at 
> com.webtrekk.flink_avro_enum.kafka.ExampleFromKafkaAndSRWithStreams.main(ExampleFromKafkaAndSRWithStreams.java:27)
>       Caused by: org.apache.calcite.runtime.CalciteContextException: From 
> line 0, column 0 to line 1, column 58: Cannot apply 'LIKE' to arguments of 
> type 'LIKE(<RAW('ORG.APACHE.AVRO.UTIL.UTF8', '...')>, <CHAR(6)>)'. Supported 
> form(s): 'LIKE(<STRING>, <STRING>, <STRING>)'
>       at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>       at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>       at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>       at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
>       at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>       at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
>       at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
>       at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4861)
>       at 
> org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:389)
>       at 
> org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:262)
>       at 
> org.apache.calcite.sql.fun.SqlLikeOperator.checkOperandTypes(SqlLikeOperator.java:104)
>       at 
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
>       at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:531)
>       at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
>       at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
>       at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>       at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
>       at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
>       at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:4006)
>       at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereClause(SqlValidatorImpl.java:3998)
>       at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3338)
>       at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>       at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>       at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>       at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
>       at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
>       at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
>       at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
>       at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org 
> <http://org.apache.flink.table.planner.calcite.flinkplannerimpl.org/>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:151)
>       ... 5 more
>       Caused by: org.apache.calcite.sql.validate.SqlValidatorException: 
> Cannot apply 'LIKE' to arguments of type 
> 'LIKE(<RAW('ORG.APACHE.AVRO.UTIL.UTF8', '...')>, <CHAR(6)>)'. Supported 
> form(s): 'LIKE(<STRING>, <STRING>, <STRING>)'
>       at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>       at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>       at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>       at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
>       at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>       at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)
>       ... 29 more
>
> It can be seen, that my attribute "text" is not treated as "String" rather
> as "RAW / UTF8" in the flink table environment. By default avro uses its
> own implementation of `CharSequence`, namely `UTF8` on deserializing
> records (also using Flink DataStream API). Is there a way to deserialize /
> convert that avro specific type into a real "String" when passing data from
> DataStreams API to Table API?
>
> Just for completion, one workaround would be to annotate the avro schema
> such that avro uses type "String" on deserialization under the hood. But
> for my case: The schema is already fixed and can not be changed easily.
>
> Please find a minimum on my github account:
> https://github.com/peterschrott/flinkStreamTableAvro
>
> Thanks & best, Peter
>

Reply via email to