[ 
https://issues.apache.org/jira/browse/FLINK-22247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-22247:
-----------------------------------
    Labels: auto-deprioritized-major stale-minor  (was: 
auto-deprioritized-major)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> can not pass AddressList when connecting to rabbitmq
> ----------------------------------------------------
>
>                 Key: FLINK-22247
>                 URL: https://issues.apache.org/jira/browse/FLINK-22247
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 1.12.2
>         Environment: flink: 1.12.2
> rabbitmq: 3.8.4
>            Reporter: Spongebob
>            Priority: Minor
>              Labels: auto-deprioritized-major, stale-minor
>
> We hope to connect to rabbitmq cluster address when using rabbitmq connector, 
> So we override the setupConnection function to pass the rabbitmq cluster 
> address, but the address class is not serializable thereby flink throws 
> exception.
> {code:java}
> //代码占位符
> val rabbitmqAddresses = Array(
>   new Address("xxx1", 5672),
>   new Address("xxx2", 5672),
>   new Address("xxx3", 5672))
> val dataStream = streamEnv
>   .addSource(new RMQSource[String](
>     rabbitmqConfig, // rabbitmq's connection config
>     "queueName", // queue name
>     true, // using correlation ids, assurance of exactly-once consume from 
> rabbitmq
>     new SimpleStringSchema // java deserialization
>   ) {
>     override def setupQueue(): Unit = {}
>     override def setupConnection(): Connection = {
>       rabbitmqConfig.getConnectionFactory.newConnection(rabbitmqAddresses)
>     }
>   }).setParallelism(1)
> {code}
> Exception in thread "main" 
> org.apache.flink.api.common.InvalidProgramException: 
> [Lcom.rabbitmq.client.Address;@436a4e4b is not serializable. The object 
> probably contains or references non serializable fields.Exception in thread 
> "main" org.apache.flink.api.common.InvalidProgramException: 
> [Lcom.rabbitmq.client.Address;@436a4e4b is not serializable. The object 
> probably contains or references non serializable fields. at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164) at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69) at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1685)
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1668)
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1652)
>  at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:693)
>  at testConsumer$.main(testConsumer.scala:30) at 
> testConsumer.main(testConsumer.scala)Caused by: 
> java.io.NotSerializableException: com.rabbitmq.client.Address at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at 
> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at 
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
>  at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143) 
> ... 9 more



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to