This is an automated email from the ASF dual-hosted git repository. reta pushed a commit to branch 3.5.x-fixes in repository https://gitbox.apache.org/repos/asf/cxf.git
commit 583cc8d562ba8e629227ca3c083577b553390d97 Author: Andriy Redko <[email protected]> AuthorDate: Sat Feb 15 10:42:30 2025 -0500 CXF-9057: Chunked Stream is closed regularly when Exception is thrown (MTOM) (#2234) (cherry picked from commit e28d4a77a7ee59d03219ff4767eae1d7bedeb207) (cherry picked from commit 7f14b6284f14e4ebe18e4f8411687d5120e91f55) # Conflicts: # systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java (cherry picked from commit 07b6af3eb368e691a85e15ad77e07aedfc6a9d33) # Conflicts: # systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java --- .../cxf/phase/AbortedInvocationException.java | 48 +++++++++ .../apache/cxf/phase/PhaseInterceptorChain.java | 3 + .../interceptor/Soap11FaultOutInterceptor.java | 8 +- .../interceptor/Soap12FaultOutInterceptor.java | 8 +- .../transport/http/AbstractHTTPDestination.java | 29 +++++- ...st.java => AbstractAttachmentChunkingTest.java} | 79 +++------------ .../cxf/systest/jaxws/AttachmentChunkingTest.java | 102 ++------------------ ...ngTest.java => AttachmentMtomChunkingTest.java} | 107 ++++----------------- .../src/test/resources/attachments/cxf9057.wsdl | 1 + 9 files changed, 135 insertions(+), 250 deletions(-) diff --git a/core/src/main/java/org/apache/cxf/phase/AbortedInvocationException.java b/core/src/main/java/org/apache/cxf/phase/AbortedInvocationException.java new file mode 100644 index 0000000000..ee2553d1af --- /dev/null +++ b/core/src/main/java/org/apache/cxf/phase/AbortedInvocationException.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.phase; + +/** + * Represents transport-specific exceptions which are used to indicate that + * a given invocation was suspended + */ +public class AbortedInvocationException extends RuntimeException { + + private static final long serialVersionUID = 6889545463301144757L; + + + public AbortedInvocationException(Throwable cause) { + super(cause); + } + + public AbortedInvocationException() { + } + + + /** + * Returns a transport-specific runtime exception + * @return RuntimeException the transport-specific runtime exception, + * can be null for asynchronous transports + */ + public RuntimeException getRuntimeException() { + Throwable ex = getCause(); + return ex instanceof RuntimeException ? (RuntimeException)ex : null; + } +} diff --git a/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java b/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java index c8a52a8e95..316e58d26f 100644 --- a/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java +++ b/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java @@ -325,6 +325,9 @@ public class PhaseInterceptorChain implements InterceptorChain { } pause(); throw ex; + } catch (AbortedInvocationException ex) { + abort(); + throw ex; } catch (RuntimeException ex) { if (!faultOccurred) { faultOccurred = true; diff --git a/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap11FaultOutInterceptor.java b/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap11FaultOutInterceptor.java index 5e612a5932..95ebee9f7c 100644 --- a/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap11FaultOutInterceptor.java +++ b/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap11FaultOutInterceptor.java @@ -37,6 +37,7 @@ import org.apache.cxf.common.util.StringUtils; import org.apache.cxf.interceptor.Fault; import org.apache.cxf.message.Message; import org.apache.cxf.message.MessageUtils; +import org.apache.cxf.phase.AbortedInvocationException; import org.apache.cxf.phase.Phase; import org.apache.cxf.staxutils.StaxUtils; @@ -69,7 +70,12 @@ public class Soap11FaultOutInterceptor extends AbstractSoapInterceptor { // have been streaming some data already and may not be able to inject a fault in the middle // of the data transfer. if (MessageUtils.getContextualBoolean(message, Message.PARTIAL_ATTACHMENTS_MESSAGE, false)) { - throw f; + // Signal that response has to be aborted midway + if (MessageUtils.getContextualBoolean(message, Message.MTOM_ENABLED, false)) { + throw new AbortedInvocationException(f); + } else { + throw f; + } } XMLStreamWriter writer = message.getContent(XMLStreamWriter.class); diff --git a/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap12FaultOutInterceptor.java b/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap12FaultOutInterceptor.java index 35cd7498a6..e766282c02 100644 --- a/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap12FaultOutInterceptor.java +++ b/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap12FaultOutInterceptor.java @@ -38,6 +38,7 @@ import org.apache.cxf.common.util.StringUtils; import org.apache.cxf.interceptor.Fault; import org.apache.cxf.message.Message; import org.apache.cxf.message.MessageUtils; +import org.apache.cxf.phase.AbortedInvocationException; import org.apache.cxf.phase.Phase; import org.apache.cxf.staxutils.StaxUtils; @@ -71,7 +72,12 @@ public class Soap12FaultOutInterceptor extends AbstractSoapInterceptor { // have been streaming some data already and may not be able to inject a fault in the middle // of the data transfer. if (MessageUtils.getContextualBoolean(message, Message.PARTIAL_ATTACHMENTS_MESSAGE, false)) { - throw f; + // Signal that response has to be aborted midway + if (MessageUtils.getContextualBoolean(message, Message.MTOM_ENABLED, false)) { + throw new AbortedInvocationException(f); + } else { + throw f; + } } XMLStreamWriter writer = message.getContent(XMLStreamWriter.class); diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java index 29f93b81fc..3933f7d494 100644 --- a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java +++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java @@ -63,6 +63,7 @@ import org.apache.cxf.message.ExchangeImpl; import org.apache.cxf.message.Message; import org.apache.cxf.message.MessageImpl; import org.apache.cxf.message.MessageUtils; +import org.apache.cxf.phase.AbortedInvocationException; import org.apache.cxf.policy.PolicyDataEngine; import org.apache.cxf.security.SecurityContext; import org.apache.cxf.security.transport.TLSSessionInfo; @@ -262,8 +263,17 @@ public abstract class AbstractHTTPDestination copyKnownRequestAttributes(req, inMessage); try { - incomingObserver.onMessage(inMessage); - invokeComplete(context, req, resp, inMessage); + try { + incomingObserver.onMessage(inMessage); + invokeComplete(context, req, resp, inMessage); + } catch (AbortedInvocationException ex) { + maybeResetAndCloseResponseOutputStream(resp); + if (ex.getRuntimeException() != null) { + throw ex.getRuntimeException(); + } else { + throw ex; + } + } } catch (SuspendedInvocationException ex) { if (ex.getRuntimeException() != null) { throw ex.getRuntimeException(); @@ -664,6 +674,21 @@ public abstract class AbstractHTTPDestination return responseStream; } + private void maybeResetAndCloseResponseOutputStream(HttpServletResponse response) throws IOException { + try { + // The Servlet API does not provide means to abort the response, the best + // we could do is reset buffers (only partial data is going to be sent) and close + // the connection. + if (!response.isCommitted()) { + response.setHeader(HttpHeaderHelper.CONNECTION, HttpHeaderHelper.CLOSE); + response.resetBuffer(); + response.getOutputStream().close(); + } + } catch (IllegalStateException ex) { + // response.getWriter() has already been called + } + } + private void closeResponseOutputStream(HttpServletResponse response) throws IOException { try { response.getOutputStream().close(); diff --git a/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java b/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AbstractAttachmentChunkingTest.java similarity index 57% copy from systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java copy to systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AbstractAttachmentChunkingTest.java index 0dcdb267d2..dc304e332a 100644 --- a/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java +++ b/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AbstractAttachmentChunkingTest.java @@ -25,47 +25,35 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.Arrays; import java.util.concurrent.ThreadLocalRandom; -import java.util.logging.Logger; import javax.activation.DataHandler; import javax.activation.DataSource; import javax.xml.ws.Binding; import javax.xml.ws.BindingProvider; -import javax.xml.ws.Endpoint; import javax.xml.ws.soap.SOAPBinding; -import javax.xml.ws.soap.SOAPFaultException; import org.apache.cxf.Download; import org.apache.cxf.DownloadFault_Exception; import org.apache.cxf.DownloadNextResponseType; -import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.helpers.IOUtils; import org.apache.cxf.jaxws.JaxWsProxyFactoryBean; import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; -import org.apache.cxf.testutil.common.AbstractBusTestServerBase; -import org.junit.BeforeClass; import org.junit.Test; -import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; -public class AttachmentChunkingTest extends AbstractBusClientServerTestBase { - private static final String PORT = allocatePort(DownloadServer.class); - private static final Logger LOG = LogUtils.getLogger(AttachmentChunkingTest.class); - - private static final class DownloadImpl implements Download { +abstract class AbstractAttachmentChunkingTest extends AbstractBusClientServerTestBase { + protected static final class DownloadImpl implements Download { @Override - public DownloadNextResponseType downloadNext(Boolean simulate) { + public DownloadNextResponseType downloadNext(Integer minNumberOfChunks, Boolean simulate) { final DownloadNextResponseType responseType = new DownloadNextResponseType(); responseType.setDataContent(new DataHandler(new DataSource() { @Override public InputStream getInputStream() { if (simulate) { - return simulate(); + return simulate((minNumberOfChunks == null) ? 1 : minNumberOfChunks); } else { return generate(100000); } @@ -90,49 +78,6 @@ public class AttachmentChunkingTest extends AbstractBusClientServerTestBase { return responseType; } } - - public static class DownloadServer extends AbstractBusTestServerBase { - protected void run() { - Object implementor = new DownloadImpl(); - String address = "http://localhost:" + PORT + "/SoapContext/SoapPort"; - Endpoint.publish(address, implementor); - } - - public static void main(String[] args) { - try { - DownloadServer s = new DownloadServer(); - s.start(); - } catch (Exception ex) { - ex.printStackTrace(); - System.exit(-1); - } finally { - LOG.info("done!"); - } - } - } - - @BeforeClass - public static void startServers() throws Exception { - assertTrue("server did not launch correctly", launchServer(DownloadServer.class, true)); - } - - @Test - public void testChunkingPartialFailure() { - final JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); - factory.setServiceClass(Download.class); - - final Download client = (Download) factory.create(); - final BindingProvider bindingProvider = (BindingProvider) client; - final Binding binding = bindingProvider.getBinding(); - - final String address = String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort", PORT); - bindingProvider.getRequestContext().put("javax.xml.ws.service.endpoint.address", address); - ((SOAPBinding) binding).setMTOMEnabled(true); - - // See please https://issues.apache.org/jira/browse/CXF-9057 - SOAPFaultException ex = assertThrows(SOAPFaultException.class, () -> client.downloadNext(true)); - assertThat(ex.getMessage(), containsString("simulated error during stream processing")); - } @Test public void testChunking() throws IOException, DownloadFault_Exception { @@ -143,24 +88,26 @@ public class AttachmentChunkingTest extends AbstractBusClientServerTestBase { final BindingProvider bindingProvider = (BindingProvider) client; final Binding binding = bindingProvider.getBinding(); - final String address = String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort", PORT); + final String address = String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort", getPort()); bindingProvider.getRequestContext().put("javax.xml.ws.service.endpoint.address", address); ((SOAPBinding) binding).setMTOMEnabled(true); - final DownloadNextResponseType response = client.downloadNext(false); - try (InputStream is = response.getDataContent().getInputStream()) { - assertThat(IOUtils.readBytesFromStream(is).length, equalTo(100000)); - } + final DownloadNextResponseType response = client.downloadNext(1, false); + assertThat(IOUtils.readBytesFromStream(response.getDataContent().getInputStream()).length, equalTo(100000)); } + protected abstract String getPort(); + private static InputStream generate(int size) { final byte[] buf = new byte[size]; Arrays.fill(buf, (byte) 'x'); return new ByteArrayInputStream(buf); } - private static InputStream simulate() { + private static InputStream simulate(final int minNumberOfChunks) { return new InputStream() { + private int chunk; + @Override public int read() { return (byte) 'x'; @@ -168,7 +115,7 @@ public class AttachmentChunkingTest extends AbstractBusClientServerTestBase { @Override public int read(byte[] b, int off, int len) { - if (ThreadLocalRandom.current().nextBoolean()) { + if (chunk++ >= minNumberOfChunks && ThreadLocalRandom.current().nextBoolean()) { throw new IllegalArgumentException("simulated error during stream processing"); } diff --git a/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java b/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java index 0dcdb267d2..6d395b6205 100644 --- a/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java +++ b/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java @@ -19,16 +19,9 @@ package org.apache.cxf.systest.jaxws; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.Arrays; -import java.util.concurrent.ThreadLocalRandom; import java.util.logging.Logger; -import javax.activation.DataHandler; -import javax.activation.DataSource; import javax.xml.ws.Binding; import javax.xml.ws.BindingProvider; import javax.xml.ws.Endpoint; @@ -36,61 +29,23 @@ import javax.xml.ws.soap.SOAPBinding; import javax.xml.ws.soap.SOAPFaultException; import org.apache.cxf.Download; -import org.apache.cxf.DownloadFault_Exception; -import org.apache.cxf.DownloadNextResponseType; import org.apache.cxf.common.logging.LogUtils; -import org.apache.cxf.helpers.IOUtils; +import org.apache.cxf.ext.logging.LoggingFeature; import org.apache.cxf.jaxws.JaxWsProxyFactoryBean; -import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; import org.apache.cxf.testutil.common.AbstractBusTestServerBase; import org.junit.BeforeClass; import org.junit.Test; import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -public class AttachmentChunkingTest extends AbstractBusClientServerTestBase { +public class AttachmentChunkingTest extends AbstractAttachmentChunkingTest { private static final String PORT = allocatePort(DownloadServer.class); private static final Logger LOG = LogUtils.getLogger(AttachmentChunkingTest.class); - private static final class DownloadImpl implements Download { - @Override - public DownloadNextResponseType downloadNext(Boolean simulate) { - final DownloadNextResponseType responseType = new DownloadNextResponseType(); - responseType.setDataContent(new DataHandler(new DataSource() { - @Override - public InputStream getInputStream() { - if (simulate) { - return simulate(); - } else { - return generate(100000); - } - } - - @Override - public OutputStream getOutputStream() { - return null; - } - - @Override - public String getContentType() { - return ""; - } - - @Override - public String getName() { - return ""; - } - })); - - return responseType; - } - } - public static class DownloadServer extends AbstractBusTestServerBase { protected void run() { Object implementor = new DownloadImpl(); @@ -119,65 +74,24 @@ public class AttachmentChunkingTest extends AbstractBusClientServerTestBase { @Test public void testChunkingPartialFailure() { final JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); + factory.setFeatures(Arrays.asList(new LoggingFeature())); factory.setServiceClass(Download.class); final Download client = (Download) factory.create(); final BindingProvider bindingProvider = (BindingProvider) client; final Binding binding = bindingProvider.getBinding(); - final String address = String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort", PORT); + final String address = String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort", getPort()); bindingProvider.getRequestContext().put("javax.xml.ws.service.endpoint.address", address); ((SOAPBinding) binding).setMTOMEnabled(true); // See please https://issues.apache.org/jira/browse/CXF-9057 - SOAPFaultException ex = assertThrows(SOAPFaultException.class, () -> client.downloadNext(true)); + SOAPFaultException ex = assertThrows(SOAPFaultException.class, () -> client.downloadNext(1, true)); assertThat(ex.getMessage(), containsString("simulated error during stream processing")); } - - @Test - public void testChunking() throws IOException, DownloadFault_Exception { - final JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); - factory.setServiceClass(Download.class); - final Download client = (Download) factory.create(); - final BindingProvider bindingProvider = (BindingProvider) client; - final Binding binding = bindingProvider.getBinding(); - - final String address = String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort", PORT); - bindingProvider.getRequestContext().put("javax.xml.ws.service.endpoint.address", address); - ((SOAPBinding) binding).setMTOMEnabled(true); - - final DownloadNextResponseType response = client.downloadNext(false); - try (InputStream is = response.getDataContent().getInputStream()) { - assertThat(IOUtils.readBytesFromStream(is).length, equalTo(100000)); - } - } - - private static InputStream generate(int size) { - final byte[] buf = new byte[size]; - Arrays.fill(buf, (byte) 'x'); - return new ByteArrayInputStream(buf); - } - - private static InputStream simulate() { - return new InputStream() { - @Override - public int read() { - return (byte) 'x'; - } - - @Override - public int read(byte[] b, int off, int len) { - if (ThreadLocalRandom.current().nextBoolean()) { - throw new IllegalArgumentException("simulated error during stream processing"); - } - - for (int i = off; i < off + len; i++) { - b[i] = (byte) 'x'; - } - - return len; - } - }; + @Override + protected String getPort() { + return PORT; } } diff --git a/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java b/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentMtomChunkingTest.java similarity index 54% copy from systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java copy to systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentMtomChunkingTest.java index 0dcdb267d2..ad527d86e4 100644 --- a/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java +++ b/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentMtomChunkingTest.java @@ -19,83 +19,41 @@ package org.apache.cxf.systest.jaxws; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.Arrays; -import java.util.concurrent.ThreadLocalRandom; import java.util.logging.Logger; -import javax.activation.DataHandler; -import javax.activation.DataSource; import javax.xml.ws.Binding; import javax.xml.ws.BindingProvider; import javax.xml.ws.Endpoint; +import javax.xml.ws.WebServiceException; import javax.xml.ws.soap.SOAPBinding; -import javax.xml.ws.soap.SOAPFaultException; import org.apache.cxf.Download; import org.apache.cxf.DownloadFault_Exception; import org.apache.cxf.DownloadNextResponseType; import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.ext.logging.LoggingFeature; import org.apache.cxf.helpers.IOUtils; import org.apache.cxf.jaxws.JaxWsProxyFactoryBean; -import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; import org.apache.cxf.testutil.common.AbstractBusTestServerBase; import org.junit.BeforeClass; import org.junit.Test; -import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -public class AttachmentChunkingTest extends AbstractBusClientServerTestBase { +public class AttachmentMtomChunkingTest extends AbstractAttachmentChunkingTest { private static final String PORT = allocatePort(DownloadServer.class); - private static final Logger LOG = LogUtils.getLogger(AttachmentChunkingTest.class); - - private static final class DownloadImpl implements Download { - @Override - public DownloadNextResponseType downloadNext(Boolean simulate) { - final DownloadNextResponseType responseType = new DownloadNextResponseType(); - responseType.setDataContent(new DataHandler(new DataSource() { - @Override - public InputStream getInputStream() { - if (simulate) { - return simulate(); - } else { - return generate(100000); - } - } - - @Override - public OutputStream getOutputStream() { - return null; - } - - @Override - public String getContentType() { - return ""; - } - - @Override - public String getName() { - return ""; - } - })); - - return responseType; - } - } + private static final Logger LOG = LogUtils.getLogger(AttachmentMtomChunkingTest.class); public static class DownloadServer extends AbstractBusTestServerBase { protected void run() { Object implementor = new DownloadImpl(); String address = "http://localhost:" + PORT + "/SoapContext/SoapPort"; - Endpoint.publish(address, implementor); + final Endpoint endpoint = Endpoint.publish(address, implementor, new LoggingFeature()); + ((SOAPBinding)endpoint.getBinding()).setMTOMEnabled(true); } public static void main(String[] args) { @@ -115,69 +73,46 @@ public class AttachmentChunkingTest extends AbstractBusClientServerTestBase { public static void startServers() throws Exception { assertTrue("server did not launch correctly", launchServer(DownloadServer.class, true)); } - + @Test - public void testChunkingPartialFailure() { + public void testChunkingPartialEarlyFailure() throws IOException, DownloadFault_Exception { final JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); + factory.setFeatures(Arrays.asList(new LoggingFeature())); factory.setServiceClass(Download.class); final Download client = (Download) factory.create(); final BindingProvider bindingProvider = (BindingProvider) client; final Binding binding = bindingProvider.getBinding(); - final String address = String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort", PORT); + final String address = String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort", getPort()); bindingProvider.getRequestContext().put("javax.xml.ws.service.endpoint.address", address); ((SOAPBinding) binding).setMTOMEnabled(true); // See please https://issues.apache.org/jira/browse/CXF-9057 - SOAPFaultException ex = assertThrows(SOAPFaultException.class, () -> client.downloadNext(true)); - assertThat(ex.getMessage(), containsString("simulated error during stream processing")); + assertThrows(WebServiceException.class, () -> client.downloadNext(1, true)); } - + @Test - public void testChunking() throws IOException, DownloadFault_Exception { + public void testChunkingPartialLateFailure() throws IOException, DownloadFault_Exception { final JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); + factory.setFeatures(Arrays.asList(new LoggingFeature())); factory.setServiceClass(Download.class); final Download client = (Download) factory.create(); final BindingProvider bindingProvider = (BindingProvider) client; final Binding binding = bindingProvider.getBinding(); - final String address = String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort", PORT); + final String address = String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort", getPort()); bindingProvider.getRequestContext().put("javax.xml.ws.service.endpoint.address", address); ((SOAPBinding) binding).setMTOMEnabled(true); - final DownloadNextResponseType response = client.downloadNext(false); - try (InputStream is = response.getDataContent().getInputStream()) { - assertThat(IOUtils.readBytesFromStream(is).length, equalTo(100000)); - } - } - - private static InputStream generate(int size) { - final byte[] buf = new byte[size]; - Arrays.fill(buf, (byte) 'x'); - return new ByteArrayInputStream(buf); + // See please https://issues.apache.org/jira/browse/CXF-9057 + final DownloadNextResponseType response = client.downloadNext(10, true); + assertThrows(IOException.class, () -> IOUtils.readBytesFromStream(response.getDataContent().getInputStream())); } - - private static InputStream simulate() { - return new InputStream() { - @Override - public int read() { - return (byte) 'x'; - } - @Override - public int read(byte[] b, int off, int len) { - if (ThreadLocalRandom.current().nextBoolean()) { - throw new IllegalArgumentException("simulated error during stream processing"); - } - - for (int i = off; i < off + len; i++) { - b[i] = (byte) 'x'; - } - - return len; - } - }; + @Override + protected String getPort() { + return PORT; } } diff --git a/systests/jaxws/src/test/resources/attachments/cxf9057.wsdl b/systests/jaxws/src/test/resources/attachments/cxf9057.wsdl index 721fa08bb4..9a83f31a96 100644 --- a/systests/jaxws/src/test/resources/attachments/cxf9057.wsdl +++ b/systests/jaxws/src/test/resources/attachments/cxf9057.wsdl @@ -21,6 +21,7 @@ </xs:complexType> <xs:complexType name="downloadNext"> <xs:sequence> + <xs:element minOccurs="0" name="minNumberOfChunks" type="xs:int"/> <xs:element minOccurs="0" name="simulate" type="xs:boolean"/> </xs:sequence> </xs:complexType>
