Repository: cxf
Updated Branches:
  refs/heads/3.1.x-fixes 8d6b79f20 -> 4406683df


[CXF-7096] Make sure everything is cleaned up if the destination sequecne times 
out


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/15ed5d4f
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/15ed5d4f
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/15ed5d4f

Branch: refs/heads/3.1.x-fixes
Commit: 15ed5d4f8d4e2443a668c29d96626debcd01a8db
Parents: 8d6b79f
Author: Daniel Kulp <dk...@apache.org>
Authored: Tue Apr 4 12:07:45 2017 -0400
Committer: Daniel Kulp <dk...@apache.org>
Committed: Tue Apr 4 12:53:40 2017 -0400

----------------------------------------------------------------------
 .../java/org/apache/cxf/ws/rm/Destination.java  |   5 +-
 .../apache/cxf/ws/rm/DestinationSequence.java   |  15 +-
 .../java/org/apache/cxf/ws/rm/RMManager.java    |   8 +
 .../cxf/ws/rm/DestinationSequenceTest.java      |   4 +-
 .../cxf/systest/ws/rm/SequenceTimeoutTest.java  | 213 +++++++++++++++++++
 5 files changed, 237 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/15ed5d4f/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java 
b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
index 3d3489b..ef1ea1a 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
@@ -78,8 +78,11 @@ public class Destination extends AbstractEndpoint {
 
     // this method ensures to keep the sequence until all the messages are 
delivered
     public void terminateSequence(DestinationSequence seq) {
+        terminateSequence(seq, false);
+    }
+    public void terminateSequence(DestinationSequence seq, boolean 
forceRemove) {
         seq.terminate();
-        if (seq.allAcknowledgedMessagesDelivered()) {
+        if (forceRemove || seq.allAcknowledgedMessagesDelivered()) {
             removeSequence(seq);
         }
     }

http://git-wip-us.apache.org/repos/asf/cxf/blob/15ed5d4f/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
----------------------------------------------------------------------
diff --git 
a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java 
b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
index 3442fc5..d186194 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
@@ -590,11 +590,16 @@ public class DestinationSequence extends AbstractSequence 
{
                     
                     // terminate regardless outstanding acknowledgments - as 
we assume that the client is
                     // gone there is no point in sending a 
SequenceAcknowledgment
-                    
-                    LogUtils.log(LOG, Level.WARNING, 
"TERMINATING_INACTIVE_SEQ_MSG", 
+                    LogUtils.log(LOG, Level.WARNING, 
"TERMINATING_INACTIVE_SEQ_MSG",
                                  
DestinationSequence.this.getIdentifier().getValue());
-                    
DestinationSequence.this.destination.terminateSequence(DestinationSequence.this);
-
+                    
DestinationSequence.this.destination.terminateSequence(DestinationSequence.this,
 true);
+                    Source source = rme.getSource();
+                    if (source != null) {
+                        SourceSequence ss = 
source.getAssociatedSequence(DestinationSequence.this.getIdentifier());
+                        if (ss != null) {
+                            source.removeSequence(ss);
+                        }
+                    }
                 } else {
                    // reschedule 
                     SequenceTermination st = new SequenceTermination();
@@ -605,4 +610,4 @@ public class DestinationSequence extends AbstractSequence {
             }
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/15ed5d4f/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java 
b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
index a29a6a7..603e8d6 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
@@ -404,6 +404,14 @@ public class RMManager {
         }
         return rme;
     }
+    public RMEndpoint findReliableEndpoint(QName qn) {
+        for (RMEndpoint rpe : reliableEndpoints.values()) {
+            if 
(qn.equals(rpe.getApplicationEndpoint().getService().getName())) {
+                return rpe;
+            }
+        }
+        return null;
+    }
 
     public Destination getDestination(Message message) throws RMException {
         RMEndpoint rme = getReliableEndpoint(message);

http://git-wip-us.apache.org/repos/asf/cxf/blob/15ed5d4f/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
----------------------------------------------------------------------
diff --git 
a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java 
b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
index 546f5f3..72666b8 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
@@ -567,7 +567,7 @@ public class DestinationSequenceTest extends Assert {
         
         DestinationSequence seq = new DestinationSequence(id, ref, destination,
             ProtocolVariation.RM10WSA200408);
-        destination.terminateSequence(seq);
+        destination.terminateSequence(seq, true);
         EasyMock.expectLastCall();
         
         Message message = setUpMessage("1");
@@ -601,7 +601,7 @@ public class DestinationSequenceTest extends Assert {
         long lastAppMessage = System.currentTimeMillis() - 30000L;
         EasyMock.expect(rme.getLastControlMessage()).andReturn(0L);
         
EasyMock.expect(rme.getLastApplicationMessage()).andReturn(lastAppMessage);
-        destination.terminateSequence(seq);
+        destination.terminateSequence(seq, true);
         EasyMock.expectLastCall();
         control.replay();
         st.run();

http://git-wip-us.apache.org/repos/asf/cxf/blob/15ed5d4f/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTimeoutTest.java
----------------------------------------------------------------------
diff --git 
a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTimeoutTest.java
 
b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTimeoutTest.java
new file mode 100644
index 0000000..0a04564
--- /dev/null
+++ 
b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTimeoutTest.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.ws.rm;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import javax.xml.namespace.QName;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.ws.BindingProvider;
+import javax.xml.ws.Dispatch;
+import javax.xml.ws.Endpoint;
+import javax.xml.ws.Service;
+import javax.xml.ws.handler.MessageContext;
+
+import org.w3c.dom.Document;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.bus.spring.SpringBusFactory;
+import org.apache.cxf.greeter_control.Greeter;
+import org.apache.cxf.greeter_control.GreeterService;
+import org.apache.cxf.systest.ws.util.ConnectionHelper;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+import org.apache.cxf.testutil.common.TestUtil;
+import org.apache.cxf.ws.rm.RMEndpoint;
+import org.apache.cxf.ws.rm.RMManager;
+import org.apache.cxf.ws.rm.manager.AcksPolicyType;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+/**
+ * 
+ */
+public class SequenceTimeoutTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = 
TestUtil.getPortNumber(SequenceTimeoutTest.class);
+    private static final String ADDRESS = "http://localhost:"; + PORT + 
"/SoapContext/GreeterPort";
+    private static final QName GREETME_NAME
+        = new QName("http://cxf.apache.org/greeter_control";, "greetMe");
+    private static final QName GREETME_SERVICE_NAME
+        = new QName("http://cxf.apache.org/greeter_control";, "GreeterService");
+
+    private static RMManager rmManager;
+    
+    private Bus greeterBus;
+    private Greeter greeter;
+
+
+    public static class Server extends AbstractBusTestServerBase {
+        Endpoint endpoint;
+
+        protected void run()  {
+            SpringBusFactory bf = new SpringBusFactory();
+            System.setProperty("db.name", "rdbm");
+            Bus bus = 
bf.createBus("org/apache/cxf/systest/ws/rm/rminterceptors.xml");
+            System.clearProperty("db.name");
+            BusFactory.setDefaultBus(bus);
+            
+            setBus(bus);
+            
+            rmManager = bus.getExtension(RMManager.class);
+            rmManager.getConfiguration().setInactivityTimeout(1000L);
+
+            //System.out.println("Created control bus " + bus);
+            GreeterImpl greeterImplementor = new GreeterImpl();
+            endpoint = Endpoint.publish(ADDRESS, greeterImplementor);
+
+            BusFactory.setDefaultBus(null);
+            BusFactory.setThreadDefaultBus(null);
+        }
+
+        public void tearDown() throws Exception {
+            endpoint.stop();
+        }
+    }
+    
+    
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        assertTrue("server did not launch correctly", 
launchServer(Server.class, true));
+    }
+
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+    }
+
+    private void init(String cfgResource, boolean useDecoupledEndpoint, 
boolean useDispatchClient) {
+        init(cfgResource, useDecoupledEndpoint, useDispatchClient, null);
+    }
+
+    private void init(String cfgResource,
+                      boolean useDecoupledEndpoint,
+                      boolean useDispatchClient,
+                      Executor executor) {
+
+        SpringBusFactory bf = new SpringBusFactory();
+        initGreeterBus(bf, cfgResource);
+        if (useDispatchClient) {
+            initDispatch();
+        } else {
+            initProxy(useDecoupledEndpoint, executor);
+        }
+    }
+    private void initGreeterBus(SpringBusFactory bf,
+                                String cfgResource) {
+        greeterBus = bf.createBus(cfgResource);
+        BusFactory.setDefaultBus(greeterBus);
+    }
+    
+    
+    private Dispatch<DOMSource> initDispatch() {
+        GreeterService gs = new GreeterService();
+        Dispatch<DOMSource> dispatch = 
gs.createDispatch(GreeterService.GreeterPort,
+                                                         DOMSource.class,
+                                                         Service.Mode.MESSAGE);
+        try {
+            updateAddressPort(dispatch, PORT);
+        } catch (Exception e) {
+            //ignore
+        }
+        
dispatch.getRequestContext().put(BindingProvider.SOAPACTION_USE_PROPERTY, 
Boolean.FALSE);
+        dispatch.getRequestContext().put(MessageContext.WSDL_OPERATION, 
GREETME_NAME);
+
+        return dispatch;
+    }
+
+    private void initProxy(boolean useDecoupledEndpoint, Executor executor) {
+        GreeterService gs = new GreeterService();
+
+        if (null != executor) {
+            gs.setExecutor(executor);
+        }
+
+        greeter = gs.getGreeterPort();
+        try {
+            updateAddressPort(greeter, PORT);
+        } catch (Exception e) {
+            //ignore
+        }
+
+        ConnectionHelper.setKeepAliveConnection(greeter, true);
+    }
+    @Test
+    public void testTimeout() throws Exception {
+        init("org/apache/cxf/systest/ws/rm/rminterceptors.xml", true, true);
+
+        List<Dispatch<DOMSource>> dispatches = new ArrayList<>(5);
+        int count = 5;
+        for (int x = 0; x < count; x++) {
+            Dispatch<DOMSource> dispatch = initDispatch();
+            AcksPolicyType ap = new AcksPolicyType();            
+            //don't send the acks to cause a memory leak - CXF-7096
+            ap.setImmediaAcksTimeout(500000L);
+            
greeterBus.getExtension(RMManager.class).getDestinationPolicy().setAcksPolicy(ap);
          
+            dispatch.invoke(getDOMRequest("One"));
+            dispatches.add(dispatch);
+        }
+        RMEndpoint ep = rmManager.findReliableEndpoint(GREETME_SERVICE_NAME);
+        Assert.assertNotNull(ep);
+        Assert.assertEquals(count, 
ep.getDestination().getAllSequences().size());
+        Assert.assertEquals(count, ep.getSource().getAllSequences().size());
+        Thread.sleep(2500);
+        System.gc();
+        Assert.assertEquals(0, ep.getDestination().getAllSequences().size());
+        Assert.assertEquals(0, ep.getSource().getAllSequences().size());
+        try {
+            dispatches.get(0).invoke(getDOMRequest("One"));
+            fail("The sequence should have been terminated");
+        } catch (Throwable t) {
+            //expected
+            Assert.assertTrue(t.getMessage().contains("not a known Sequence 
identifier"));
+        }
+        rmManager.getStore();
+    }
+    private DOMSource getDOMRequest(String n)
+        throws Exception {
+        InputStream is =
+            getClass().getResourceAsStream("twoway"
+                                           + "Req" + n + ".xml");
+        DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+        factory.setNamespaceAware(true);
+        DocumentBuilder builder = factory.newDocumentBuilder();
+        Document newDoc = builder.parse(is);
+        return new DOMSource(newDoc);
+    }
+}

Reply via email to