[ https://issues.apache.org/jira/browse/FLINK-22247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-22247: ----------------------------------- Labels: stale-major (was: ) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 30 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > can not pass AddressList when connecting to rabbitmq > ---------------------------------------------------- > > Key: FLINK-22247 > URL: https://issues.apache.org/jira/browse/FLINK-22247 > Project: Flink > Issue Type: Improvement > Components: API / DataStream > Affects Versions: 1.12.2 > Environment: flink: 1.12.2 > rabbitmq: 3.8.4 > Reporter: Spongebob > Priority: Major > Labels: stale-major > > We hope to connect to rabbitmq cluster address when using rabbitmq connector, > So we override the setupConnection function to pass the rabbitmq cluster > address, but the address class is not serializable thereby flink throws > exception. > {code:java} > //代码占位符 > val rabbitmqAddresses = Array( > new Address("xxx1", 5672), > new Address("xxx2", 5672), > new Address("xxx3", 5672)) > val dataStream = streamEnv > .addSource(new RMQSource[String]( > rabbitmqConfig, // rabbitmq's connection config > "queueName", // queue name > true, // using correlation ids, assurance of exactly-once consume from > rabbitmq > new SimpleStringSchema // java deserialization > ) { > override def setupQueue(): Unit = {} > override def setupConnection(): Connection = { > rabbitmqConfig.getConnectionFactory.newConnection(rabbitmqAddresses) > } > }).setParallelism(1) > {code} > Exception in thread "main" > org.apache.flink.api.common.InvalidProgramException: > [Lcom.rabbitmq.client.Address;@436a4e4b is not serializable. The object > probably contains or references non serializable fields.Exception in thread > "main" org.apache.flink.api.common.InvalidProgramException: > [Lcom.rabbitmq.client.Address;@436a4e4b is not serializable. The object > probably contains or references non serializable fields. at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164) at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69) at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1685) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1668) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1652) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:693) > at testConsumer$.main(testConsumer.scala:30) at > testConsumer.main(testConsumer.scala)Caused by: > java.io.NotSerializableException: com.rabbitmq.client.Address at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at > java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624) > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143) > ... 9 more -- This message was sent by Atlassian Jira (v8.3.4#803005)