[jira] [Updated] (AMQ-4585) MQTT BlockingConnection.receive fails when receiving pending messages after reconnect without cleaning session
[ https://issues.apache.org/jira/browse/AMQ-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pedro Marques updated AMQ-4585: --- Attachment: MQTTTest.java Tentative test code disconnecting several times. MQTT BlockingConnection.receive fails when receiving pending messages after reconnect without cleaning session -- Key: AMQ-4585 URL: https://issues.apache.org/jira/browse/AMQ-4585 Project: ActiveMQ Issue Type: Bug Affects Versions: 5.8.0 Reporter: Pedro Marques Attachments: MQTTTest.java The system throws at least three different types of exceptions when a subscriber receives the first pending message without cleaning the session. The test case corresponds to receiving several messages from a publisher then closing the subscriber connection and finally reconnecting with setCleanSession(false) and attempt to read the messages published while the subscriber was disconnected. The exceptions thrown: {code} java.net.ProtocolException: Command from server contained an invalid message id: 1 at org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723) at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) {code} {code} java.lang.ArrayIndexOutOfBoundsException: 0 at org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81) at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40) at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) {code} {code} java.net.ProtocolException: Unexpected MQTT command type: 0 at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) {code} No message is shown in the server. The problem doesn't occur always but most of the times the first reconnection attempt is made. With setCleanSession(true) the system works fine. Code sample (publisher, permanently running): {code} MQTT mqtt = new MQTT(); mqtt.setHost(url); mqtt.setUserName(user); mqtt.setPassword(password); mqtt.setClientId(test_id); int i = 0; while (true) { BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); String message = TestMessage: + i; connection.publish(TopicA, message.getBytes(), QoS.AT_LEAST_ONCE, false);
[jira] [Updated] (AMQ-4585) MQTT BlockingConnection.receive fails when receiving pending messages after reconnect without cleaning session
[ https://issues.apache.org/jira/browse/AMQ-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pedro Marques updated AMQ-4585: --- Description: The system throws at least three different types of exceptions when a subscriber receives the first pending message without cleaning the session. The test case corresponds to receiving several messages from a publisher then closing the subscriber connection and finally reconnecting with setCleanSession(false) and attempt to read the messages published while the subscriber was disconnected. The exceptions thrown: {code} java.net.ProtocolException: Command from server contained an invalid message id: 1 at org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723) at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) {code} {code} java.lang.ArrayIndexOutOfBoundsException: 0 at org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81) at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40) at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) {code} {code} java.net.ProtocolException: Unexpected MQTT command type: 0 at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) {code} The problem doesn't occur always but most of the times the first reconnection attempt is made. With setCleanSession(true) the system works fine. Code sample (publisher, permanently running): {code} MQTT mqtt = new MQTT(); mqtt.setHost(url); mqtt.setUserName(user); mqtt.setPassword(password); mqtt.setClientId(test_id); int i = 0; while (true) { BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); String message = TestMessage: + i; connection.publish(TopicA, message.getBytes(), QoS.AT_LEAST_ONCE, false); System.out.println(Vendor: Sent message.); Thread.sleep(2500); connection.disconnect(); Thread.sleep(2500); i++; } {code} Code sample (subscriber, fails multiple times when restarting after the connection is closed): {code} try { MQTT = new MQTT(); mqtt.setHost(url); mqtt.setClientId(clientId); mqtt.setUserName(user); mqtt.setPassword(password); mqtt.setCleanSession(false); BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); Topic[] topics = {new Topic(TopicA, QoS.EXACTLY_ONCE)}; byte[] qoses =
[jira] [Updated] (AMQ-4585) MQTT BlockingConnection.receive fails when receiving pending messages after reconnect without cleaning session
[ https://issues.apache.org/jira/browse/AMQ-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pedro Marques updated AMQ-4585: --- Description: The system throws at least three different types of exceptions when a subscriber receives the first pending message without cleaning the session. The test case corresponds to receiving several messages from a publisher then closing the subscriber connection and finally reconnecting with setCleanSession(false) and attempt to read the messages published while the subscriber was disconnected. The exceptions thrown: {code} java.net.ProtocolException: Command from server contained an invalid message id: 1 at org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723) at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) {code} {code} java.lang.ArrayIndexOutOfBoundsException: 0 at org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81) at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40) at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) {code} {code} java.net.ProtocolException: Unexpected MQTT command type: 0 at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) {code} The problem doesn't occur always but most of the times the first reconnection attempt is made. With setCleanSession(true) the system works fine. Code sample (publisher, permanently running): {code} MQTT mqtt = new MQTT(); mqtt.setHost(url); mqtt.setUserName(user); mqtt.setPassword(password); mqtt.setClientId(test_id); int i = 0; while (true) { BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); String message = TestMessage: + i; connection.publish(TopicA, message.getBytes(), QoS.AT_LEAST_ONCE, false); System.out.println(Vendor: Sent message.); Thread.sleep(2500); connection.disconnect(); Thread.sleep(2500); i++; } {code} Code sample (subscriber, fails multiple times when restarting after the connection is closed): {code} BlockingConnection connection = null; try { MQTT = new MQTT(); mqtt.setHost(url); mqtt.setClientId(clientId); mqtt.setUserName(user); mqtt.setPassword(password); mqtt.setCleanSession(false); connection = mqtt.blockingConnection(); connection.connect(); Topic[] topics = {new Topic(TopicA, QoS.EXACTLY_ONCE)}; byte[]
[jira] [Updated] (AMQ-4585) MQTT BlockingConnection.receive fails when receiving pending messages after reconnect without cleaning session
[ https://issues.apache.org/jira/browse/AMQ-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pedro Marques updated AMQ-4585: --- Description: The system throws at least three different types of exceptions when a subscriber receives the first pending message without cleaning the session. The test case corresponds to receiving several messages from a publisher then closing the subscriber connection and finally reconnecting with setCleanSession(false) and attempt to read the messages published while the subscriber was disconnected. The exceptions thrown: {code} java.net.ProtocolException: Command from server contained an invalid message id: 1 at org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723) at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) {code} {code} java.lang.ArrayIndexOutOfBoundsException: 0 at org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81) at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40) at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) {code} {code} java.net.ProtocolException: Unexpected MQTT command type: 0 at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) {code} The problem doesn't occur always but most of the times the first reconnection attempt is made. With setCleanSession(true) the system works fine. Code sample (publisher, permanently running): {code} MQTT mqtt = new MQTT(); mqtt.setHost(url); mqtt.setUserName(user); mqtt.setPassword(password); mqtt.setClientId(test_id); int i = 0; while (true) { BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); String message = TestMessage: + i; connection.publish(TopicA, message.getBytes(), QoS.AT_LEAST_ONCE, false); System.out.println(Vendor: Sent message.); Thread.sleep(2500); connection.disconnect(); Thread.sleep(2500); i++; } {code} Code sample (subscriber, fails multiple times when restarting after the connection is closed): {code} BlockingConnection connection = null; try { MQTT = new MQTT(); mqtt.setHost(url); mqtt.setClientId(clientId); mqtt.setUserName(user); mqtt.setPassword(password); mqtt.setCleanSession(false); connection = mqtt.blockingConnection(); connection.connect(); Topic[] topics = {new Topic(TopicA, QoS.EXACTLY_ONCE)}; byte[]
[jira] [Updated] (AMQ-4585) MQTT BlockingConnection.receive fails when receiving pending messages after reconnect without cleaning session
[ https://issues.apache.org/jira/browse/AMQ-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pedro Marques updated AMQ-4585: --- Description: The system throws at least three different types of exceptions when a subscriber receives the first pending message without cleaning the session. The test case corresponds to receiving several messages from a publisher then closing the subscriber connection and finally reconnecting with setCleanSession(false) and attempt to read the messages published while the subscriber was disconnected. The exceptions thrown: {code} java.net.ProtocolException: Command from server contained an invalid message id: 1 at org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723) at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) {code} {code} java.lang.ArrayIndexOutOfBoundsException: 0 at org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81) at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40) at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) {code} {code} java.net.ProtocolException: Unexpected MQTT command type: 0 at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) {code} No message is shown in the server. The problem doesn't occur always but most of the times the first reconnection attempt is made. With setCleanSession(true) the system works fine. Code sample (publisher, permanently running): {code} MQTT mqtt = new MQTT(); mqtt.setHost(url); mqtt.setUserName(user); mqtt.setPassword(password); mqtt.setClientId(test_id); int i = 0; while (true) { BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); String message = TestMessage: + i; connection.publish(TopicA, message.getBytes(), QoS.AT_LEAST_ONCE, false); System.out.println(Vendor: Sent message.); Thread.sleep(2500); connection.disconnect(); Thread.sleep(2500); i++; } {code} Code sample (subscriber, fails multiple times when restarting after the connection is closed): {code} BlockingConnection connection = null; try { MQTT = new MQTT(); mqtt.setHost(url); mqtt.setClientId(clientId); mqtt.setUserName(user); mqtt.setPassword(password); mqtt.setCleanSession(false); connection = mqtt.blockingConnection(); connection.connect(); Topic[] topics = {new