在本地IDEA里测试处理相同TOPIC正常,但在线上环境出现了这样的异常: org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Serializer。将StringSerializer 换成 ByteArraySerializer也是类似错误,不知道该如何解决该问题了。请问还有其它思路来解决这个问题吗? 业务逻辑非常简单:从SOURCE表内过滤数据到sink表。 flink版本:1.11.1 kafka版本:2.1.0
SQL内KAFKA配置如下: source: create table ******* with ( 'connector' = 'kafka', 'topic'='**********', 'scan.startup.mode'='latest-offset', 'format'='json', 'properties.group.id' = '***********', 'properties.bootstrap.servers'='***********:9092', 'properties.enable.auto.commit'='true', 'properties.auto.commit.interval.ms'='1000', 'properties.key.deserializer'='org.apache.kafka.common.serialization.StringDeserializer', 'properties.value.deserializer'='org.apache.kafka.common.serialization.StringDeserializer', 'properties.security.protocol'='SASL_PLAINTEXT', 'properties.sasl.mechanism'='PLAIN', 'properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule required username=\"****\" password=\"****\";'); sink: create table ******* with ( 'connector' = 'kafka', 'topic'='*******', 'scan.startup.mode'='latest-offset', 'format'='json', 'properties.bootstrap.servers'='*****:9092', 'properties.max.poll.records'='50', 'properties.enable.auto.commit'='true', 'properties.auto.commit.interval.ms'='1000', 'properties.key.serializer'='org.apache.kafka.common.serialization.StringSerializer', 'properties.value.serializer'='org.apache.kafka.common.serialization.StringSerializer', 'properties.security.protocol'='SASL_PLAINTEXT', 'properties.sasl.mechanism'='PLAIN', 'properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule required username=\"*****\" password=\"******\";');