Re: Avro UTF-8 deserialization on integration DataStreams API -> Table API (with Confluent SchemaRegistry)

2021-10-19 Thread Peter Schrott
Update:
I assume you are talking about DataStreamSource.process(.), right?
(
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#process-org.apache.flink.streaming.api.functions.ProcessFunction-
)

So similar to a .map(.) function you suggest to touch each and every event
and cast the `CharSequence` to a `String`?
I also thought about that workaround but for 2 reasons I don't really like
it:
1) I do not really want to introduce another pojo which only lives inside
Flink runtime, also my record is quite huge and nested -> additional
maintenance for 2 (actually equal) data structures
2) I do not want to add one more step to the pipeline and transform each
and every record -> produces overhead

Thats why I was looking for a solution that transfers the data stream
records in the avro pojo - with UTF8s - to the table API and represents the
concerning as `STRING` (as UTF8 is an analogous data type to STRING, both
based on `CharSequence`)

Thanks & Best
Peter


On Tue, Oct 19, 2021 at 4:47 AM Caizhi Weng  wrote:

> Hi!
>
> You can call streamSource.processRecord to change the CharSequence to a
> String, then change the stream to a table.
>
> Peter Schrott  于2021年10月18日周一 下午8:40写道:
>
>> Hi there,
>>
>> I have a Kafka topic where the schema of its values is defined by the
>> "MyRecord" record in the following Avro IDL and registered to the
>> Confluent
>> Schema Registry:
>>
>> @namespace("org.example")
>> protocol MyProtocol {
>>record MyRecord {
>>   string text;
>>}
>> }
>>
>> The topic is consumed with a KafkaSource and then then passed into
>> StreamTableEnvironment. On the temporary view I want to run SQL queries.
>>
>> But the following exception is thrown on startup of the job:
>>
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>> SQL validation failed. From line 0, column 0 to line 1, column 58: Cannot 
>> apply 'LIKE' to arguments of type 'LIKE(> '...')>, )'. Supported form(s): 'LIKE(, , )'
>>   at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org 
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156)
>>   at 
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
>>   at 
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205)
>>   at 
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>>   at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
>>   at 
>> com.webtrekk.flink_avro_enum.kafka.ExampleFromKafkaAndSRWithStreams.main(ExampleFromKafkaAndSRWithStreams.java:27)
>>   Caused by: org.apache.calcite.runtime.CalciteContextException: From 
>> line 0, column 0 to line 1, column 58: Cannot apply 'LIKE' to arguments of 
>> type 'LIKE(, )'. Supported 
>> form(s): 'LIKE(, , )'
>>   at 
>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>  Method)
>>   at 
>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>   at 
>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>   at 
>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
>>   at 
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
>>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4861)
>>   at 
>> org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:389)
>>   at 
>> org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:262)
>>   at 
>> org.apache.calcite.sql.fun.SqlLikeOperator.checkOperandTypes(SqlLikeOperator.java:104)
>>   at 
>> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
>>   at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:531)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
>>   at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
>>   at 
>> 

Re: Avro UTF-8 deserialization on integration DataStreams API -> Table API (with Confluent SchemaRegistry)

2021-10-19 Thread Caizhi Weng
Hi!

Sorry for misleading. I mean DataStream#process, see
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#process-org.apache.flink.streaming.api.functions.ProcessFunction-

Peter Schrott  于2021年10月19日周二 下午3:10写道:

> Hi & thanks!
>
> DataStreamSource does not provide a method processRecord:
>
> https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/datastream/DataStreamSource.html
>
> Can you point me to the docs for that?
>
> Thanks, Peter
>
> On Tue, Oct 19, 2021 at 4:47 AM Caizhi Weng  wrote:
>
>> Hi!
>>
>> You can call streamSource.processRecord to change the CharSequence to a
>> String, then change the stream to a table.
>>
>> Peter Schrott  于2021年10月18日周一 下午8:40写道:
>>
>>> Hi there,
>>>
>>> I have a Kafka topic where the schema of its values is defined by the
>>> "MyRecord" record in the following Avro IDL and registered to the
>>> Confluent
>>> Schema Registry:
>>>
>>> @namespace("org.example")
>>> protocol MyProtocol {
>>>record MyRecord {
>>>   string text;
>>>}
>>> }
>>>
>>> The topic is consumed with a KafkaSource and then then passed into
>>> StreamTableEnvironment. On the temporary view I want to run SQL queries.
>>>
>>> But the following exception is thrown on startup of the job:
>>>
>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>>> SQL validation failed. From line 0, column 0 to line 1, column 58: Cannot 
>>> apply 'LIKE' to arguments of type 'LIKE(>> '...')>, )'. Supported form(s): 'LIKE(, , 
>>> )'
>>>   at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org 
>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156)
>>>   at 
>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
>>>   at 
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205)
>>>   at 
>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>>>   at 
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
>>>   at 
>>> com.webtrekk.flink_avro_enum.kafka.ExampleFromKafkaAndSRWithStreams.main(ExampleFromKafkaAndSRWithStreams.java:27)
>>>   Caused by: org.apache.calcite.runtime.CalciteContextException: From 
>>> line 0, column 0 to line 1, column 58: Cannot apply 'LIKE' to arguments of 
>>> type 'LIKE(, )'. 
>>> Supported form(s): 'LIKE(, , )'
>>>   at 
>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>>  Method)
>>>   at 
>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>>   at 
>>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>   at 
>>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
>>>   at 
>>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>>>   at 
>>> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
>>>   at 
>>> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
>>>   at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4861)
>>>   at 
>>> org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:389)
>>>   at 
>>> org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:262)
>>>   at 
>>> org.apache.calcite.sql.fun.SqlLikeOperator.checkOperandTypes(SqlLikeOperator.java:104)
>>>   at 
>>> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
>>>   at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:531)
>>>   at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
>>>   at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
>>>   at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>>>   at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
>>>   at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
>>>   at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:4006)
>>>   at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereClause(SqlValidatorImpl.java:3998)
>>>   at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3338)
>>>   at 
>>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>>>   at 
>>> 

Re: Avro UTF-8 deserialization on integration DataStreams API -> Table API (with Confluent SchemaRegistry)

2021-10-19 Thread Peter Schrott
Hi & thanks!

DataStreamSource does not provide a method processRecord:
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/datastream/DataStreamSource.html

Can you point me to the docs for that?

Thanks, Peter

On Tue, Oct 19, 2021 at 4:47 AM Caizhi Weng  wrote:

> Hi!
>
> You can call streamSource.processRecord to change the CharSequence to a
> String, then change the stream to a table.
>
> Peter Schrott  于2021年10月18日周一 下午8:40写道:
>
>> Hi there,
>>
>> I have a Kafka topic where the schema of its values is defined by the
>> "MyRecord" record in the following Avro IDL and registered to the
>> Confluent
>> Schema Registry:
>>
>> @namespace("org.example")
>> protocol MyProtocol {
>>record MyRecord {
>>   string text;
>>}
>> }
>>
>> The topic is consumed with a KafkaSource and then then passed into
>> StreamTableEnvironment. On the temporary view I want to run SQL queries.
>>
>> But the following exception is thrown on startup of the job:
>>
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>> SQL validation failed. From line 0, column 0 to line 1, column 58: Cannot 
>> apply 'LIKE' to arguments of type 'LIKE(> '...')>, )'. Supported form(s): 'LIKE(, , )'
>>   at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org 
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156)
>>   at 
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
>>   at 
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205)
>>   at 
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>>   at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
>>   at 
>> com.webtrekk.flink_avro_enum.kafka.ExampleFromKafkaAndSRWithStreams.main(ExampleFromKafkaAndSRWithStreams.java:27)
>>   Caused by: org.apache.calcite.runtime.CalciteContextException: From 
>> line 0, column 0 to line 1, column 58: Cannot apply 'LIKE' to arguments of 
>> type 'LIKE(, )'. Supported 
>> form(s): 'LIKE(, , )'
>>   at 
>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>  Method)
>>   at 
>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>   at 
>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>   at 
>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
>>   at 
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
>>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4861)
>>   at 
>> org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:389)
>>   at 
>> org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:262)
>>   at 
>> org.apache.calcite.sql.fun.SqlLikeOperator.checkOperandTypes(SqlLikeOperator.java:104)
>>   at 
>> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
>>   at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:531)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
>>   at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:4006)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereClause(SqlValidatorImpl.java:3998)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3338)
>>   at 
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>>   at 
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
>>   at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
>>   at 
>> 

Re: Avro UTF-8 deserialization on integration DataStreams API -> Table API (with Confluent SchemaRegistry)

2021-10-18 Thread Caizhi Weng
Hi!

You can call streamSource.processRecord to change the CharSequence to a
String, then change the stream to a table.

Peter Schrott  于2021年10月18日周一 下午8:40写道:

> Hi there,
>
> I have a Kafka topic where the schema of its values is defined by the
> "MyRecord" record in the following Avro IDL and registered to the Confluent
> Schema Registry:
>
> @namespace("org.example")
> protocol MyProtocol {
>record MyRecord {
>   string text;
>}
> }
>
> The topic is consumed with a KafkaSource and then then passed into
> StreamTableEnvironment. On the temporary view I want to run SQL queries.
>
> But the following exception is thrown on startup of the job:
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 0, column 0 to line 1, column 58: Cannot 
> apply 'LIKE' to arguments of type 'LIKE( '...')>, )'. Supported form(s): 'LIKE(, , )'
>   at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org 
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
>   at 
> com.webtrekk.flink_avro_enum.kafka.ExampleFromKafkaAndSRWithStreams.main(ExampleFromKafkaAndSRWithStreams.java:27)
>   Caused by: org.apache.calcite.runtime.CalciteContextException: From 
> line 0, column 0 to line 1, column 58: Cannot apply 'LIKE' to arguments of 
> type 'LIKE(, )'. Supported 
> form(s): 'LIKE(, , )'
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4861)
>   at 
> org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:389)
>   at 
> org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:262)
>   at 
> org.apache.calcite.sql.fun.SqlLikeOperator.checkOperandTypes(SqlLikeOperator.java:104)
>   at 
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
>   at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:531)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
>   at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:4006)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereClause(SqlValidatorImpl.java:3998)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3338)
>   at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
>   at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
>   at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org 
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:151)
> 

Avro UTF-8 deserialization on integration DataStreams API -> Table API (with Confluent SchemaRegistry)

2021-10-18 Thread Peter Schrott
Hi there,

I have a Kafka topic where the schema of its values is defined by the
"MyRecord" record in the following Avro IDL and registered to the Confluent
Schema Registry:

@namespace("org.example")
protocol MyProtocol {
   record MyRecord {
  string text;
   }
}

The topic is consumed with a KafkaSource and then then passed into
StreamTableEnvironment. On the temporary view I want to run SQL queries.

But the following exception is thrown on startup of the job:

Exception in thread "main"
org.apache.flink.table.api.ValidationException: SQL validation failed.
>From line 0, column 0 to line 1, column 58: Cannot apply 'LIKE' to
arguments of type 'LIKE(,
)'. Supported form(s): 'LIKE(, , )'
  at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156)
  at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
  at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205)
  at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
  at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
  at 
com.webtrekk.flink_avro_enum.kafka.ExampleFromKafkaAndSRWithStreams.main(ExampleFromKafkaAndSRWithStreams.java:27)
  Caused by: org.apache.calcite.runtime.CalciteContextException:
>From line 0, column 0 to line 1, column 58: Cannot apply 'LIKE' to
arguments of type 'LIKE(,
)'. Supported form(s): 'LIKE(, , )'
  at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
  at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  at 
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at 
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
  at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
  at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
  at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4861)
  at 
org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:389)
  at 
org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:262)
  at 
org.apache.calcite.sql.fun.SqlLikeOperator.checkOperandTypes(SqlLikeOperator.java:104)
  at 
org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
  at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:531)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
  at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:4006)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereClause(SqlValidatorImpl.java:3998)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3338)
  at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
  at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
  at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
  at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:151)
  ... 5 more
  Caused by:
org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply
'LIKE' to arguments of type 'LIKE(, )'. Supported form(s): 'LIKE(, ,
)'
  at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
  at