Spongebob created FLINK-22247:
---------------------------------

             Summary: can not pass AddressList when connecting to rabbitmq
                 Key: FLINK-22247
                 URL: https://issues.apache.org/jira/browse/FLINK-22247
             Project: Flink
          Issue Type: Improvement
          Components: API / DataStream
    Affects Versions: 1.12.2
         Environment: flink: 2.12.2

rabbitmq: 3.8.4
            Reporter: Spongebob


We hope to connect to rabbitmq cluster address when using rabbitmq connector, 
So we override the setupConnection function to pass the rabbitmq cluster 
address, but the address class is not serializable thereby flink throws 
exception.
{code:java}
//代码占位符
val rabbitmqAddresses = Array(
  new Address("xxx1", 5672),
  new Address("xxx2", 5672),
  new Address("xxx3", 5672))

val dataStream = streamEnv
  .addSource(new RMQSource[String](
    rabbitmqConfig, // rabbitmq's connection config
    "queueName", // queue name
    true, // using correlation ids, assurance of exactly-once consume from 
rabbitmq
    new SimpleStringSchema // java deserialization
  ) {
    override def setupQueue(): Unit = {}

    override def setupConnection(): Connection = {
      rabbitmqConfig.getConnectionFactory.newConnection(rabbitmqAddresses)
    }
  }).setParallelism(1)
{code}
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: 
[Lcom.rabbitmq.client.Address;@436a4e4b is not serializable. The object 
probably contains or references non serializable fields.Exception in thread 
"main" org.apache.flink.api.common.InvalidProgramException: 
[Lcom.rabbitmq.client.Address;@436a4e4b is not serializable. The object 
probably contains or references non serializable fields. at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164) at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69) at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1685)
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1668)
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1652)
 at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:693)
 at testConsumer$.main(testConsumer.scala:30) at 
testConsumer.main(testConsumer.scala)Caused by: 
java.io.NotSerializableException: com.rabbitmq.client.Address at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at 
java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at 
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
 at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143) ... 
9 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to