Hi,

I am trying to run FlinkSQL (through sql-client.sh) and read messages from
Kafka topics.

I downloaded 1.17.1 and extracted it. For the Kafka connectivity, I've
added flink-sql-connector-kafka-1.1.7.jar, flink-connector-kafka-1.1.7.jar
and kafka-clients-2.8.1.jar and now start the sql-client-.sh script adding
these three libs with the "-l" parameter.

Now I'd like to create a table based on a Kafka topic like this:
CREATE TABLE bla (
id BIGINT,
from_account INT,
to_account INT,
amount DOUBLE,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'bla',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'bla',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'true',
'json.ignore-parse-errors' = 'false'
);


Now when I select * from bla, I get the following error message:

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassCastException: cannot assign instance of
org.apache.kafka.clients.consumer.OffsetResetStrategy to field
org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.offsetResetStrategy
of type
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy
in instance of
org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer

I tried different Kafka client lib versions and also multiple
flink(-sql)-connector-kafka versions with no avail :(

Best, Ralph M. Debusmann

Reply via email to