This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 538b04aa0 [AMQ-9255] Initialize the transient field of the class 
Message (#1051)
538b04aa0 is described below

commit 538b04aa0c18f61cd47b261c7372a3e559c2ca0e
Author: Nicolas Filotto <essob...@users.noreply.github.com>
AuthorDate: Tue Oct 3 13:54:40 2023 +0200

    [AMQ-9255] Initialize the transient field of the class Message (#1051)
---
 .../activemq/command/ActiveMQMapMessage.java       |   4 +-
 .../java/org/apache/activemq/command/Message.java  |  13 ++
 .../java/org/apache/activemq/bugs/AMQ9255Test.java | 150 +++++++++++++++++++++
 3 files changed, 166 insertions(+), 1 deletion(-)

diff --git 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
index 8384c3473..1bd6e5362 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
@@ -104,7 +104,9 @@ public class ActiveMQMapMessage extends ActiveMQMessage 
implements MapMessage {
 
     protected transient Map<String, Object> map = new HashMap<String, 
Object>();
 
-    private Object readResolve() throws ObjectStreamException {
+    @Override
+    protected Object readResolve() throws ObjectStreamException {
+        super.readResolve();
         if (this.map == null) {
             this.map = new HashMap<String, Object>();
         }
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/command/Message.java 
b/activemq-client/src/main/java/org/apache/activemq/command/Message.java
index 88e9787cb..2a31047c9 100644
--- a/activemq-client/src/main/java/org/apache/activemq/command/Message.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/Message.java
@@ -20,6 +20,7 @@ import java.beans.Transient;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.ObjectStreamException;
 import java.io.OutputStream;
 import java.util.Collections;
 import java.util.HashMap;
@@ -858,4 +859,16 @@ public abstract class Message extends BaseCommand 
implements MarshallAware, Mess
     public boolean canProcessAsExpired() {
         return processAsExpired.compareAndSet(false, true);
     }
+
+    /**
+     * Initialize the transient fields at deserialization to get a normal 
state.
+     *
+     * @see <a 
href="https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/io/Serializable.html";>Serializable
 Javadoc</a>
+     */
+    protected Object readResolve() throws ObjectStreamException {
+        if (this.processAsExpired == null) {
+            this.processAsExpired = new AtomicBoolean();
+        }
+        return this;
+    }
 }
diff --git 
a/activemq-http/src/test/java/org/apache/activemq/bugs/AMQ9255Test.java 
b/activemq-http/src/test/java/org/apache/activemq/bugs/AMQ9255Test.java
new file mode 100644
index 000000000..ad0f9f356
--- /dev/null
+++ b/activemq-http/src/test/java/org/apache/activemq/bugs/AMQ9255Test.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Session;
+import jakarta.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.transport.http.WaitForJettyListener;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class AMQ9255Test {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AMQ9255Test.class);
+
+    @Rule
+    public TestName name = new TestName();
+    private BrokerService broker;
+    private ActiveMQConnectionFactory connectionFactory;
+    private Connection sendConnection, receiveConnection;
+    private Session sendSession, receiveSession;
+    private MessageConsumer consumer;
+    private MessageProducer producer;
+
+    @Before
+    public void setUp() throws Exception {
+        if (broker == null) {
+            broker = createBroker();
+            broker.start();
+        }
+        WaitForJettyListener.waitForJettySocketToAccept(getBrokerURL());
+        connectionFactory = createConnectionFactory();
+        LOG.info("Creating send connection");
+        sendConnection = createSendConnection();
+        LOG.info("Starting send connection");
+        sendConnection.start();
+
+        LOG.info("Creating receive connection");
+        receiveConnection = createReceiveConnection();
+        LOG.info("Starting receive connection");
+        receiveConnection.start();
+
+        sendSession = sendConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        receiveSession = receiveConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        LOG.info("Created sendSession: " + sendSession);
+        LOG.info("Created receiveSession: " + receiveSession);
+
+        producer = 
sendSession.createProducer(sendSession.createQueue(getProducerSubject()));
+        consumer = 
receiveSession.createConsumer(receiveSession.createQueue(getConsumerSubject()));
+
+        LOG.info("Created consumer of type: " + consumer.getClass());
+        LOG.info("Created producer of type: " + producer.getClass());
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (receiveSession != null) {
+            receiveSession.close();
+        }
+        if (sendSession != null) {
+            sendSession.close();
+        }
+        if (receiveConnection != null) {
+            receiveConnection.close();
+        }
+        if (sendConnection != null) {
+            sendConnection.close();
+        }
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    private String getConsumerSubject() {
+        return "ActiveMQ.DLQ";
+    }
+
+    private String getProducerSubject() {
+        return name.getMethodName();
+    }
+
+    private Connection createReceiveConnection() throws Exception {
+        return connectionFactory.createConnection();
+    }
+
+    private Connection createSendConnection() throws Exception {
+        return connectionFactory.createConnection();
+    }
+
+    private ActiveMQConnectionFactory createConnectionFactory() {
+        return new ActiveMQConnectionFactory(getBrokerURL());
+    }
+
+    protected String getBrokerURL() {
+        return "http://localhost:8161";;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setPersistent(false);
+        answer.addConnector(getBrokerURL());
+        answer.setUseJmx(false);
+        return answer;
+    }
+
+    @Test
+    public void testExpiredMessages() throws Exception {
+        // Given
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        producer.setTimeToLive(100L);
+        String text = name.toString();
+
+        // When
+        producer.send(sendSession.createTextMessage(text));
+
+        // Then
+        TextMessage message = (TextMessage) consumer.receive(30_000);
+        assertNotNull(message);
+        assertEquals(text, message.getText());
+    }
+}

Reply via email to