Author: ningjiang
Date: Tue Oct 9 02:29:43 2007
New Revision: 583087
URL: http://svn.apache.org/viewvc?rev=583087&view=rev
Log:
CXF-1093 applied patch of delegating the thread pool request to the WorkManager
, thanks Jeff. Also fixed a typo in the jaxws ClientServerTest
Added:
incubator/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/cxf/CXFWorkAdapter.java
(with props)
incubator/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/cxf/WorkManagerThreadPool.java
(with props)
incubator/cxf/trunk/integration/jca/src/test/java/org/apache/cxf/jca/servant/EJBEndpointTest.java
(with props)
Modified:
incubator/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/cxf/JCABusFactory.java
incubator/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/servant/EJBEndpoint.java
incubator/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPServerEngine.java
incubator/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPServerEngineFactory.java
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jaxws/ClientServerTest.java
Added:
incubator/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/cxf/CXFWorkAdapter.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/cxf/CXFWorkAdapter.java?rev=583087&view=auto
==============================================================================
---
incubator/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/cxf/CXFWorkAdapter.java
(added)
+++
incubator/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/cxf/CXFWorkAdapter.java
Tue Oct 9 02:29:43 2007
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jca.cxf;
+
+import java.util.logging.Logger;
+
+import javax.resource.spi.work.WorkEvent;
+import javax.resource.spi.work.WorkListener;
+
+import org.apache.cxf.common.logging.LogUtils;
+
+/**
+ *
+ */
+public class CXFWorkAdapter implements WorkListener {
+
+ public static final long DEFAULT_START_TIME_OUT = 1 * 60 * 1000; // 1
minute
+
+ private static final Logger LOG =
LogUtils.getL7dLogger(CXFWorkAdapter.class);
+
+ public void workAccepted(WorkEvent e) {
+ LOG.fine("workAccepted: [" + e.getWork() + "], source is [" +
e.getSource() + "]");
+ }
+
+
+ public void workCompleted(WorkEvent e) {
+ LOG.fine("workCompleted: [" + e.getWork() + "], source is [" +
e.getSource() + "]");
+ }
+
+
+ public void workRejected(WorkEvent e) {
+ LOG.severe("workRejected: [" + e.getWork() + "], source is [" +
e.getSource() + "]");
+ LOG.severe("root cause is:" + e.getException().getMessage());
+
+ e.getException().printStackTrace();
+ }
+
+
+ public void workStarted(WorkEvent e) {
+ LOG.fine("workStarted: [" + e.getWork() + "], source is [" +
e.getSource() + "]");
+ }
+
+}
Propchange:
incubator/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/cxf/CXFWorkAdapter.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/cxf/CXFWorkAdapter.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
incubator/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/cxf/JCABusFactory.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/cxf/JCABusFactory.java?rev=583087&r1=583086&r2=583087&view=diff
==============================================================================
---
incubator/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/cxf/JCABusFactory.java
(original)
+++
incubator/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/cxf/JCABusFactory.java
Tue Oct 9 02:29:43 2007
@@ -30,6 +30,9 @@
import java.util.logging.Logger;
import javax.resource.ResourceException;
+import javax.resource.spi.BootstrapContext;
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkManager;
import org.apache.cxf.Bus;
import org.apache.cxf.BusFactory;
@@ -54,6 +57,7 @@
private ClassLoader appserverClassLoader;
private ManagedConnectionFactoryImpl mcf;
private Object raBootstrapContext;
+
public JCABusFactory(ManagedConnectionFactoryImpl aMcf) {
this.mcf = aMcf;
@@ -102,7 +106,7 @@
protected void initializeServants() throws ResourceException {
if (isMonitorEJBServicePropertiesEnabled()) {
LOG.info("Ejb service properties auto-detect enabled. ");
- startPropertiesMonitorThread();
+ startPropertiesMonitorWorker();
} else {
URL propsUrl = mcf.getEJBServicePropertiesURLInstance();
if (propsUrl != null) {
@@ -124,6 +128,7 @@
EJBServantConfig config = new EJBServantConfig(theJNDIName,
value);
EJBEndpoint ejbEndpoint = new EJBEndpoint(config);
ejbEndpoint.setEjbServantBaseURL(mcf.getEJBServantBaseURL());
+ ejbEndpoint.setWorkManager(getWorkManager());
Server servant = ejbEndpoint.publish();
synchronized (servantsCache) {
@@ -140,16 +145,20 @@
}
- private void startPropertiesMonitorThread() throws ResourceException {
+ private void startPropertiesMonitorWorker() throws ResourceException {
Integer pollIntervalInteger =
mcf.getEJBServicePropertiesPollInterval();
int pollInterval = pollIntervalInteger.intValue();
LOG.info("Ejb service properties poll interval is: [" + pollInterval +
" seconds]");
- EJBServicePropertiesMonitorRunnable r = new
EJBServicePropertiesMonitorRunnable(pollInterval);
- Thread t = new Thread(r);
- t.setDaemon(true);
- t.start();
+ EJBServicePropertiesMonitorWorker worker = new
EJBServicePropertiesMonitorWorker(pollInterval);
+ if (getWorkManager() != null) {
+ getWorkManager().startWork(worker,
CXFWorkAdapter.DEFAULT_START_TIME_OUT, null, worker);
+ } else {
+ Thread t = new Thread(worker);
+ t.setDaemon(true);
+ t.start();
+ }
}
private boolean isMonitorEJBServicePropertiesEnabled() throws
ResourceException {
@@ -239,13 +248,15 @@
init();
}
- private class EJBServicePropertiesMonitorRunnable implements Runnable {
+ private class EJBServicePropertiesMonitorWorker extends CXFWorkAdapter
implements Work {
private long previousModificationTime;
private final int pollIntervalSeconds;
private final File propsFile;
- private boolean continuing = true;
+
+ //The release() method will be called on separate thread while the
run() is processing.
+ private volatile boolean continuing = true;
- EJBServicePropertiesMonitorRunnable(int pollInterval) throws
ResourceException {
+ EJBServicePropertiesMonitorWorker(int pollInterval) throws
ResourceException {
pollIntervalSeconds = pollInterval;
propsFile = new
File(mcf.getEJBServicePropertiesURLInstance().getPath());
}
@@ -268,6 +279,10 @@
}
} while (continuing);
}
+
+ public void release() {
+ this.continuing = false;
+ }
protected boolean isPropertiesFileModified() {
boolean fileModified = false;
@@ -285,6 +300,14 @@
// for unit test
protected void setBootstrapContext(Object ctx) {
raBootstrapContext = ctx;
+ }
+
+ public WorkManager getWorkManager() {
+ if (getBootstrapContext() instanceof BootstrapContext) {
+ BootstrapContext context = (BootstrapContext)getBootstrapContext();
+ return context.getWorkManager();
+ }
+ return null;
}
Added:
incubator/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/cxf/WorkManagerThreadPool.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/cxf/WorkManagerThreadPool.java?rev=583087&view=auto
==============================================================================
---
incubator/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/cxf/WorkManagerThreadPool.java
(added)
+++
incubator/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/cxf/WorkManagerThreadPool.java
Tue Oct 9 02:29:43 2007
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jca.cxf;
+
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkEvent;
+import javax.resource.spi.work.WorkException;
+import javax.resource.spi.work.WorkManager;
+
+import org.mortbay.thread.ThreadPool;
+
+/**
+ * The adapter for using Application Server's thread pool.
+ * Just simply override the dispatch method.
+ */
+public class WorkManagerThreadPool extends CXFWorkAdapter implements
ThreadPool {
+
+ private WorkManager workManager;
+
+ private boolean isLowOnThreads;
+
+ private Runnable theJob;
+
+ public WorkManagerThreadPool(WorkManager wm) {
+ this.workManager = wm;
+ }
+
+ public boolean dispatch(Runnable job) {
+ try {
+ theJob = job;
+ workManager.startWork(new WorkImpl(job), DEFAULT_START_TIME_OUT,
null, this);
+ return true;
+ } catch (WorkException e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+
+ public int getIdleThreads() {
+ return 0;
+ }
+
+
+ public int getThreads() {
+ return 1;
+ }
+
+
+ public boolean isLowOnThreads() {
+ return isLowOnThreads;
+ }
+
+
+ public void setIsLowOnThreads(boolean isLow) {
+ this.isLowOnThreads = isLow;
+ }
+
+ public void join() throws InterruptedException {
+ //Do nothing
+ }
+
+ public class WorkImpl implements Work {
+
+ private Runnable job;
+
+ public WorkImpl(Runnable job) {
+ this.job = job;
+ }
+
+ public void run() {
+ job.run();
+ }
+
+ public void release() {
+ //empty
+ }
+ }
+
+ @Override
+ public void workRejected(WorkEvent e) {
+ super.workRejected(e);
+ WorkException we = e.getException();
+ if (WorkException.START_TIMED_OUT.equals(we.getErrorCode()) &&
!isLowOnThreads) {
+ setIsLowOnThreads(true);
+ dispatch(theJob);
+ }
+ }
+
+}
Propchange:
incubator/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/cxf/WorkManagerThreadPool.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/cxf/WorkManagerThreadPool.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
incubator/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/servant/EJBEndpoint.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/servant/EJBEndpoint.java?rev=583087&r1=583086&r2=583087&view=diff
==============================================================================
---
incubator/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/servant/EJBEndpoint.java
(original)
+++
incubator/cxf/trunk/integration/jca/src/main/java/org/apache/cxf/jca/servant/EJBEndpoint.java
Tue Oct 9 02:29:43 2007
@@ -21,19 +21,28 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.logging.Logger;
import javax.ejb.EJBHome;
import javax.jws.WebService;
import javax.naming.Context;
import javax.naming.InitialContext;
+import javax.resource.spi.work.WorkManager;
import javax.rmi.PortableRemoteObject;
+import org.apache.cxf.Bus;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.common.util.PackageUtils;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.frontend.ServerFactoryBean;
import org.apache.cxf.jaxws.JaxWsServerFactoryBean;
+import org.apache.cxf.jca.cxf.WorkManagerThreadPool;
+import org.apache.cxf.transport.http_jetty.JettyHTTPServerEngine;
+import org.apache.cxf.transport.http_jetty.JettyHTTPServerEngineFactory;
+import org.mortbay.jetty.AbstractConnector;
+import org.mortbay.jetty.nio.SelectChannelConnector;
public class EJBEndpoint {
@@ -48,6 +57,8 @@
private String ejbServantBaseURL;
+ private WorkManager workManager;
+
public EJBEndpoint(EJBServantConfig ejbConfig) {
this.config = ejbConfig;
}
@@ -72,9 +83,33 @@
:
getDefaultEJBServantBaseURL();
String address = baseAddress + "/" + config.getJNDIName();
factory.setAddress(address);
+ if (getWorkManager() != null) {
+ setWorkManagerThreadPoolToJetty(factory.getBus(), baseAddress);
+ }
+ Server server = factory.create();
LOG.info("Published EJB Endpoint of [" + config.getJNDIName() + "] at
[" + address + "]");
- return factory.create();
+
+ return server;
+ }
+
+
+ protected void setWorkManagerThreadPoolToJetty(Bus bus, String
baseAddress) {
+ JettyHTTPServerEngineFactory engineFactory =
bus.getExtension(JettyHTTPServerEngineFactory.class);
+ int port = getAddressPort(baseAddress);
+ if (engineFactory.retrieveJettyHTTPServerEngine(port) != null) {
+ return;
+ }
+ JettyHTTPServerEngine engine = new JettyHTTPServerEngine();
+ AbstractConnector connector = new SelectChannelConnector();
+ connector.setPort(port);
+ connector.setThreadPool(new WorkManagerThreadPool(getWorkManager()));
+ engine.setConnector(connector);
+ engine.setPort(port);
+
+ List<JettyHTTPServerEngine> engineList = new
ArrayList<JettyHTTPServerEngine>();
+ engineList.add(engine);
+ engineFactory.setEnginesList(engineList);
}
public String getServiceClassName() throws Exception {
@@ -95,6 +130,18 @@
return "http://" + hostName + ":9999";
}
+ public int getAddressPort(String address) {
+ int index = address.lastIndexOf(":");
+ int end = address.lastIndexOf("/");
+ if (index == 4) {
+ return 80;
+ }
+ if (end < index) {
+ return new Integer(address.substring(index + 1)).intValue();
+ }
+ return new Integer(address.substring(index + 1, end)).intValue();
+ }
+
private static boolean isJaxWsServiceInterface(Class<?> cls) {
if (cls == null) {
return false;
@@ -119,5 +166,15 @@
}
return false;
}
+
+ public WorkManager getWorkManager() {
+ return workManager;
+ }
+
+ public void setWorkManager(WorkManager workManager) {
+ this.workManager = workManager;
+ }
+
+
}
Added:
incubator/cxf/trunk/integration/jca/src/test/java/org/apache/cxf/jca/servant/EJBEndpointTest.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/integration/jca/src/test/java/org/apache/cxf/jca/servant/EJBEndpointTest.java?rev=583087&view=auto
==============================================================================
---
incubator/cxf/trunk/integration/jca/src/test/java/org/apache/cxf/jca/servant/EJBEndpointTest.java
(added)
+++
incubator/cxf/trunk/integration/jca/src/test/java/org/apache/cxf/jca/servant/EJBEndpointTest.java
Tue Oct 9 02:29:43 2007
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jca.servant;
+
+import junit.framework.Assert;
+
+import org.junit.Before;
+import org.junit.Test;
+
+
+
+
+/**
+ *
+ */
+public class EJBEndpointTest extends Assert {
+
+ private EJBEndpoint endpoint;
+
+ @Before
+ public void setUp() throws Exception {
+ endpoint = new EJBEndpoint(null);
+ }
+
+ @Test
+ public void testGetAddressPort() throws Exception {
+ int port = endpoint.getAddressPort("http://localhost:8080/services");
+ assertEquals(8080, port);
+ }
+
+ @Test
+ public void testGetAddress80Port() throws Exception {
+ int port = endpoint.getAddressPort("http://localhost/services");
+ assertEquals(80, port);
+ }
+
+ @Test
+ public void testGetAddressEndPort() throws Exception {
+ int port = endpoint.getAddressPort("http://localhost:9999");
+ assertEquals(9999, port);
+ }
+
+}
Propchange:
incubator/cxf/trunk/integration/jca/src/test/java/org/apache/cxf/jca/servant/EJBEndpointTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/cxf/trunk/integration/jca/src/test/java/org/apache/cxf/jca/servant/EJBEndpointTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
incubator/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPServerEngine.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPServerEngine.java?rev=583087&r1=583086&r2=583087&view=diff
==============================================================================
---
incubator/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPServerEngine.java
(original)
+++
incubator/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPServerEngine.java
Tue Oct 9 02:29:43 2007
@@ -166,7 +166,7 @@
*/
public void shutdown() {
if (shouldDestroyPort()) {
- if (servantCount == 0) {
+ if (factory != null && servantCount == 0) {
factory.destroyForPort(port);
} else {
LOG.log(Level.WARNING, "FAILED_TO_SHOWDOWN_ENGINE_MSG", port);
Modified:
incubator/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPServerEngineFactory.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPServerEngineFactory.java?rev=583087&r1=583086&r2=583087&view=diff
==============================================================================
---
incubator/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPServerEngineFactory.java
(original)
+++
incubator/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPServerEngineFactory.java
Tue Oct 9 02:29:43 2007
@@ -144,7 +144,7 @@
* This call retrieves a previously configured JettyHTTPServerEngine for
the
* given port. If none exists, this call returns null.
*/
- protected synchronized JettyHTTPServerEngine
retrieveJettyHTTPServerEngine(int port) {
+ public synchronized JettyHTTPServerEngine
retrieveJettyHTTPServerEngine(int port) {
return portMap.get(port);
}
Modified:
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jaxws/ClientServerTest.java
URL:
http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jaxws/ClientServerTest.java?rev=583087&r1=583086&r2=583087&view=diff
==============================================================================
---
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jaxws/ClientServerTest.java
(original)
+++
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jaxws/ClientServerTest.java
Tue Oct 9 02:29:43 2007
@@ -85,7 +85,7 @@
"SoapPort");
private final QName fakePortName = new
QName("http://apache.org/hello_world_soap_http",
- "FackPort");
+ "FakePort");
private final QName portName1 = new
QName("http://apache.org/hello_world_soap_http",