This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.5.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 583cc8d562ba8e629227ca3c083577b553390d97
Author: Andriy Redko <[email protected]>
AuthorDate: Sat Feb 15 10:42:30 2025 -0500

    CXF-9057: Chunked Stream is closed regularly when Exception is thrown 
(MTOM) (#2234)
    
    (cherry picked from commit e28d4a77a7ee59d03219ff4767eae1d7bedeb207)
    (cherry picked from commit 7f14b6284f14e4ebe18e4f8411687d5120e91f55)
    
    # Conflicts:
    #       
systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java
    (cherry picked from commit 07b6af3eb368e691a85e15ad77e07aedfc6a9d33)
    
    # Conflicts:
    #       
systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java
---
 .../cxf/phase/AbortedInvocationException.java      |  48 +++++++++
 .../apache/cxf/phase/PhaseInterceptorChain.java    |   3 +
 .../interceptor/Soap11FaultOutInterceptor.java     |   8 +-
 .../interceptor/Soap12FaultOutInterceptor.java     |   8 +-
 .../transport/http/AbstractHTTPDestination.java    |  29 +++++-
 ...st.java => AbstractAttachmentChunkingTest.java} |  79 +++------------
 .../cxf/systest/jaxws/AttachmentChunkingTest.java  | 102 ++------------------
 ...ngTest.java => AttachmentMtomChunkingTest.java} | 107 ++++-----------------
 .../src/test/resources/attachments/cxf9057.wsdl    |   1 +
 9 files changed, 135 insertions(+), 250 deletions(-)

diff --git 
a/core/src/main/java/org/apache/cxf/phase/AbortedInvocationException.java 
b/core/src/main/java/org/apache/cxf/phase/AbortedInvocationException.java
new file mode 100644
index 0000000000..ee2553d1af
--- /dev/null
+++ b/core/src/main/java/org/apache/cxf/phase/AbortedInvocationException.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.phase;
+
+/**
+ * Represents transport-specific exceptions which are used to indicate that
+ * a given invocation was suspended
+ */
+public class AbortedInvocationException extends RuntimeException {
+
+    private static final long serialVersionUID = 6889545463301144757L;
+
+
+    public AbortedInvocationException(Throwable cause) {
+        super(cause);
+    }
+
+    public AbortedInvocationException() {
+    }
+
+
+    /**
+     * Returns a transport-specific runtime exception
+     * @return RuntimeException the transport-specific runtime exception,
+     *         can be null for asynchronous transports
+     */
+    public RuntimeException getRuntimeException() {
+        Throwable ex = getCause();
+        return ex instanceof RuntimeException ? (RuntimeException)ex : null;
+    }
+}
diff --git a/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java 
b/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
index c8a52a8e95..316e58d26f 100644
--- a/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
+++ b/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
@@ -325,6 +325,9 @@ public class PhaseInterceptorChain implements 
InterceptorChain {
                     }
                     pause();
                     throw ex;
+                } catch (AbortedInvocationException ex) {
+                    abort();
+                    throw ex;
                 } catch (RuntimeException ex) {
                     if (!faultOccurred) {
                         faultOccurred = true;
diff --git 
a/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap11FaultOutInterceptor.java
 
b/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap11FaultOutInterceptor.java
index 5e612a5932..95ebee9f7c 100644
--- 
a/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap11FaultOutInterceptor.java
+++ 
b/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap11FaultOutInterceptor.java
@@ -37,6 +37,7 @@ import org.apache.cxf.common.util.StringUtils;
 import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageUtils;
+import org.apache.cxf.phase.AbortedInvocationException;
 import org.apache.cxf.phase.Phase;
 import org.apache.cxf.staxutils.StaxUtils;
 
@@ -69,7 +70,12 @@ public class Soap11FaultOutInterceptor extends 
AbstractSoapInterceptor {
             // have been streaming some data already and may not be able to 
inject a fault in the middle 
             // of the data transfer.
             if (MessageUtils.getContextualBoolean(message, 
Message.PARTIAL_ATTACHMENTS_MESSAGE, false)) {
-                throw f;
+                // Signal that response has to be aborted midway
+                if (MessageUtils.getContextualBoolean(message, 
Message.MTOM_ENABLED, false)) {
+                    throw new AbortedInvocationException(f);
+                } else {
+                    throw f;
+                }
             }
 
             XMLStreamWriter writer = message.getContent(XMLStreamWriter.class);
diff --git 
a/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap12FaultOutInterceptor.java
 
b/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap12FaultOutInterceptor.java
index 35cd7498a6..e766282c02 100644
--- 
a/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap12FaultOutInterceptor.java
+++ 
b/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap12FaultOutInterceptor.java
@@ -38,6 +38,7 @@ import org.apache.cxf.common.util.StringUtils;
 import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageUtils;
+import org.apache.cxf.phase.AbortedInvocationException;
 import org.apache.cxf.phase.Phase;
 import org.apache.cxf.staxutils.StaxUtils;
 
@@ -71,7 +72,12 @@ public class Soap12FaultOutInterceptor extends 
AbstractSoapInterceptor {
             // have been streaming some data already and may not be able to 
inject a fault in the middle 
             // of the data transfer.
             if (MessageUtils.getContextualBoolean(message, 
Message.PARTIAL_ATTACHMENTS_MESSAGE, false)) {
-                throw f;
+                // Signal that response has to be aborted midway
+                if (MessageUtils.getContextualBoolean(message, 
Message.MTOM_ENABLED, false)) {
+                    throw new AbortedInvocationException(f);
+                } else {
+                    throw f;
+                }
             }
 
             XMLStreamWriter writer = message.getContent(XMLStreamWriter.class);
diff --git 
a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java
 
b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java
index 29f93b81fc..3933f7d494 100644
--- 
a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java
+++ 
b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java
@@ -63,6 +63,7 @@ import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.message.MessageUtils;
+import org.apache.cxf.phase.AbortedInvocationException;
 import org.apache.cxf.policy.PolicyDataEngine;
 import org.apache.cxf.security.SecurityContext;
 import org.apache.cxf.security.transport.TLSSessionInfo;
@@ -262,8 +263,17 @@ public abstract class AbstractHTTPDestination
         copyKnownRequestAttributes(req, inMessage);
 
         try {
-            incomingObserver.onMessage(inMessage);
-            invokeComplete(context, req, resp, inMessage);
+            try {
+                incomingObserver.onMessage(inMessage);
+                invokeComplete(context, req, resp, inMessage);
+            } catch (AbortedInvocationException ex) {
+                maybeResetAndCloseResponseOutputStream(resp);
+                if (ex.getRuntimeException() != null) {
+                    throw ex.getRuntimeException();
+                } else {
+                    throw ex;
+                }
+            } 
         } catch (SuspendedInvocationException ex) {
             if (ex.getRuntimeException() != null) {
                 throw ex.getRuntimeException();
@@ -664,6 +674,21 @@ public abstract class AbstractHTTPDestination
         return responseStream;
     }
 
+    private void maybeResetAndCloseResponseOutputStream(HttpServletResponse 
response) throws IOException {
+        try {
+            // The Servlet API does not provide means to abort the response, 
the best
+            // we could do is reset buffers (only partial data is going to be 
sent) and close
+            // the connection.
+            if (!response.isCommitted()) {
+                response.setHeader(HttpHeaderHelper.CONNECTION, 
HttpHeaderHelper.CLOSE);
+                response.resetBuffer();
+                response.getOutputStream().close();
+            }
+        } catch (IllegalStateException ex) {
+            // response.getWriter() has already been called
+        }
+    }
+
     private void closeResponseOutputStream(HttpServletResponse response) 
throws IOException {
         try {
             response.getOutputStream().close();
diff --git 
a/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java
 
b/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AbstractAttachmentChunkingTest.java
similarity index 57%
copy from 
systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java
copy to 
systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AbstractAttachmentChunkingTest.java
index 0dcdb267d2..dc304e332a 100644
--- 
a/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java
+++ 
b/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AbstractAttachmentChunkingTest.java
@@ -25,47 +25,35 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.logging.Logger;
 
 import javax.activation.DataHandler;
 import javax.activation.DataSource;
 import javax.xml.ws.Binding;
 import javax.xml.ws.BindingProvider;
-import javax.xml.ws.Endpoint;
 import javax.xml.ws.soap.SOAPBinding;
-import javax.xml.ws.soap.SOAPFaultException;
 
 import org.apache.cxf.Download;
 import org.apache.cxf.DownloadFault_Exception;
 import org.apache.cxf.DownloadNextResponseType;
-import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
 import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
-import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
 
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
 
-public class AttachmentChunkingTest extends AbstractBusClientServerTestBase {
-    private static final String PORT = allocatePort(DownloadServer.class);
-    private static final Logger LOG = 
LogUtils.getLogger(AttachmentChunkingTest.class);
-
-    private static final class DownloadImpl implements Download {
+abstract class AbstractAttachmentChunkingTest extends 
AbstractBusClientServerTestBase {
+    protected static final class DownloadImpl implements Download {
         @Override
-        public DownloadNextResponseType downloadNext(Boolean simulate) {
+        public DownloadNextResponseType downloadNext(Integer 
minNumberOfChunks, Boolean simulate) {
             final DownloadNextResponseType responseType = new 
DownloadNextResponseType();
             responseType.setDataContent(new DataHandler(new DataSource() {
                 @Override
                 public InputStream getInputStream() {
                     if (simulate) {
-                        return simulate();
+                        return simulate((minNumberOfChunks == null) ? 1 : 
minNumberOfChunks);
                     } else {
                         return generate(100000);
                     }
@@ -90,49 +78,6 @@ public class AttachmentChunkingTest extends 
AbstractBusClientServerTestBase {
             return responseType;
         }
     }
-
-    public static class DownloadServer extends AbstractBusTestServerBase {
-        protected void run() {
-            Object implementor = new DownloadImpl();
-            String address = "http://localhost:"; + PORT + 
"/SoapContext/SoapPort";
-            Endpoint.publish(address, implementor);
-        }
-
-        public static void main(String[] args) {
-            try {
-                DownloadServer s = new DownloadServer();
-                s.start();
-            } catch (Exception ex) {
-                ex.printStackTrace();
-                System.exit(-1);
-            } finally {
-                LOG.info("done!");
-            }
-        }
-    }
-
-    @BeforeClass
-    public static void startServers() throws Exception {
-        assertTrue("server did not launch correctly", 
launchServer(DownloadServer.class, true));
-    }
-
-    @Test
-    public void testChunkingPartialFailure() {
-        final JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
-        factory.setServiceClass(Download.class);
-
-        final Download client = (Download) factory.create();
-        final BindingProvider bindingProvider = (BindingProvider) client;
-        final Binding binding = bindingProvider.getBinding();
-
-        final String address = 
String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort";, PORT);
-        
bindingProvider.getRequestContext().put("javax.xml.ws.service.endpoint.address",
 address);
-        ((SOAPBinding) binding).setMTOMEnabled(true);
-
-        // See please https://issues.apache.org/jira/browse/CXF-9057
-        SOAPFaultException ex = assertThrows(SOAPFaultException.class, () -> 
client.downloadNext(true));
-        assertThat(ex.getMessage(), containsString("simulated error during 
stream processing"));
-    }
     
     @Test
     public void testChunking() throws IOException, DownloadFault_Exception {
@@ -143,24 +88,26 @@ public class AttachmentChunkingTest extends 
AbstractBusClientServerTestBase {
         final BindingProvider bindingProvider = (BindingProvider) client;
         final Binding binding = bindingProvider.getBinding();
 
-        final String address = 
String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort";, PORT);
+        final String address = 
String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort";, 
getPort());
         
bindingProvider.getRequestContext().put("javax.xml.ws.service.endpoint.address",
 address);
         ((SOAPBinding) binding).setMTOMEnabled(true);
 
-        final DownloadNextResponseType response = client.downloadNext(false);
-        try (InputStream is = response.getDataContent().getInputStream()) {
-            assertThat(IOUtils.readBytesFromStream(is).length, 
equalTo(100000));
-        }
+        final DownloadNextResponseType response = client.downloadNext(1, 
false);
+        
assertThat(IOUtils.readBytesFromStream(response.getDataContent().getInputStream()).length,
 equalTo(100000));
     }
     
+    protected abstract String getPort();
+    
     private static InputStream generate(int size) {
         final byte[] buf = new byte[size];
         Arrays.fill(buf, (byte) 'x');
         return new ByteArrayInputStream(buf);
     }
     
-    private static InputStream simulate() {
+    private static InputStream simulate(final int minNumberOfChunks) {
         return new InputStream() {
+            private int chunk;
+
             @Override
             public int read() {
                 return (byte) 'x';
@@ -168,7 +115,7 @@ public class AttachmentChunkingTest extends 
AbstractBusClientServerTestBase {
 
             @Override
             public int read(byte[] b, int off, int len) {
-                if (ThreadLocalRandom.current().nextBoolean()) {
+                if (chunk++ >= minNumberOfChunks && 
ThreadLocalRandom.current().nextBoolean()) {
                     throw new IllegalArgumentException("simulated error during 
stream processing");
                 }
 
diff --git 
a/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java
 
b/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java
index 0dcdb267d2..6d395b6205 100644
--- 
a/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java
+++ 
b/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java
@@ -19,16 +19,9 @@
 
 package org.apache.cxf.systest.jaxws;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.Arrays;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.logging.Logger;
 
-import javax.activation.DataHandler;
-import javax.activation.DataSource;
 import javax.xml.ws.Binding;
 import javax.xml.ws.BindingProvider;
 import javax.xml.ws.Endpoint;
@@ -36,61 +29,23 @@ import javax.xml.ws.soap.SOAPBinding;
 import javax.xml.ws.soap.SOAPFaultException;
 
 import org.apache.cxf.Download;
-import org.apache.cxf.DownloadFault_Exception;
-import org.apache.cxf.DownloadNextResponseType;
 import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.ext.logging.LoggingFeature;
 import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
-import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
-public class AttachmentChunkingTest extends AbstractBusClientServerTestBase {
+public class AttachmentChunkingTest extends AbstractAttachmentChunkingTest {
     private static final String PORT = allocatePort(DownloadServer.class);
     private static final Logger LOG = 
LogUtils.getLogger(AttachmentChunkingTest.class);
 
-    private static final class DownloadImpl implements Download {
-        @Override
-        public DownloadNextResponseType downloadNext(Boolean simulate) {
-            final DownloadNextResponseType responseType = new 
DownloadNextResponseType();
-            responseType.setDataContent(new DataHandler(new DataSource() {
-                @Override
-                public InputStream getInputStream() {
-                    if (simulate) {
-                        return simulate();
-                    } else {
-                        return generate(100000);
-                    }
-                }
-
-                @Override
-                public OutputStream getOutputStream() {
-                    return null;
-                }
-
-                @Override
-                public String getContentType() {
-                    return "";
-                }
-
-                @Override
-                public String getName() {
-                    return "";
-                }
-            }));
-
-            return responseType;
-        }
-    }
-
     public static class DownloadServer extends AbstractBusTestServerBase {
         protected void run() {
             Object implementor = new DownloadImpl();
@@ -119,65 +74,24 @@ public class AttachmentChunkingTest extends 
AbstractBusClientServerTestBase {
     @Test
     public void testChunkingPartialFailure() {
         final JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
+        factory.setFeatures(Arrays.asList(new LoggingFeature()));
         factory.setServiceClass(Download.class);
 
         final Download client = (Download) factory.create();
         final BindingProvider bindingProvider = (BindingProvider) client;
         final Binding binding = bindingProvider.getBinding();
 
-        final String address = 
String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort";, PORT);
+        final String address = 
String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort";, 
getPort());
         
bindingProvider.getRequestContext().put("javax.xml.ws.service.endpoint.address",
 address);
         ((SOAPBinding) binding).setMTOMEnabled(true);
 
         // See please https://issues.apache.org/jira/browse/CXF-9057
-        SOAPFaultException ex = assertThrows(SOAPFaultException.class, () -> 
client.downloadNext(true));
+        SOAPFaultException ex = assertThrows(SOAPFaultException.class, () -> 
client.downloadNext(1, true));
         assertThat(ex.getMessage(), containsString("simulated error during 
stream processing"));
     }
-    
-    @Test
-    public void testChunking() throws IOException, DownloadFault_Exception {
-        final JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
-        factory.setServiceClass(Download.class);
 
-        final Download client = (Download) factory.create();
-        final BindingProvider bindingProvider = (BindingProvider) client;
-        final Binding binding = bindingProvider.getBinding();
-
-        final String address = 
String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort";, PORT);
-        
bindingProvider.getRequestContext().put("javax.xml.ws.service.endpoint.address",
 address);
-        ((SOAPBinding) binding).setMTOMEnabled(true);
-
-        final DownloadNextResponseType response = client.downloadNext(false);
-        try (InputStream is = response.getDataContent().getInputStream()) {
-            assertThat(IOUtils.readBytesFromStream(is).length, 
equalTo(100000));
-        }
-    }
-    
-    private static InputStream generate(int size) {
-        final byte[] buf = new byte[size];
-        Arrays.fill(buf, (byte) 'x');
-        return new ByteArrayInputStream(buf);
-    }
-    
-    private static InputStream simulate() {
-        return new InputStream() {
-            @Override
-            public int read() {
-                return (byte) 'x';
-            }
-
-            @Override
-            public int read(byte[] b, int off, int len) {
-                if (ThreadLocalRandom.current().nextBoolean()) {
-                    throw new IllegalArgumentException("simulated error during 
stream processing");
-                }
-
-                for (int i = off; i < off + len; i++) {
-                    b[i] = (byte) 'x';
-                }
-
-                return len;
-            }
-        };
+    @Override
+    protected String getPort() {
+        return PORT;
     }
 }
diff --git 
a/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java
 
b/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentMtomChunkingTest.java
similarity index 54%
copy from 
systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java
copy to 
systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentMtomChunkingTest.java
index 0dcdb267d2..ad527d86e4 100644
--- 
a/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentChunkingTest.java
+++ 
b/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/AttachmentMtomChunkingTest.java
@@ -19,83 +19,41 @@
 
 package org.apache.cxf.systest.jaxws;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.Arrays;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.logging.Logger;
 
-import javax.activation.DataHandler;
-import javax.activation.DataSource;
 import javax.xml.ws.Binding;
 import javax.xml.ws.BindingProvider;
 import javax.xml.ws.Endpoint;
+import javax.xml.ws.WebServiceException;
 import javax.xml.ws.soap.SOAPBinding;
-import javax.xml.ws.soap.SOAPFaultException;
 
 import org.apache.cxf.Download;
 import org.apache.cxf.DownloadFault_Exception;
 import org.apache.cxf.DownloadNextResponseType;
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.ext.logging.LoggingFeature;
 import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
-import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
-public class AttachmentChunkingTest extends AbstractBusClientServerTestBase {
+public class AttachmentMtomChunkingTest extends AbstractAttachmentChunkingTest 
{
     private static final String PORT = allocatePort(DownloadServer.class);
-    private static final Logger LOG = 
LogUtils.getLogger(AttachmentChunkingTest.class);
-
-    private static final class DownloadImpl implements Download {
-        @Override
-        public DownloadNextResponseType downloadNext(Boolean simulate) {
-            final DownloadNextResponseType responseType = new 
DownloadNextResponseType();
-            responseType.setDataContent(new DataHandler(new DataSource() {
-                @Override
-                public InputStream getInputStream() {
-                    if (simulate) {
-                        return simulate();
-                    } else {
-                        return generate(100000);
-                    }
-                }
-
-                @Override
-                public OutputStream getOutputStream() {
-                    return null;
-                }
-
-                @Override
-                public String getContentType() {
-                    return "";
-                }
-
-                @Override
-                public String getName() {
-                    return "";
-                }
-            }));
-
-            return responseType;
-        }
-    }
+    private static final Logger LOG = 
LogUtils.getLogger(AttachmentMtomChunkingTest.class);
 
     public static class DownloadServer extends AbstractBusTestServerBase {
         protected void run() {
             Object implementor = new DownloadImpl();
             String address = "http://localhost:"; + PORT + 
"/SoapContext/SoapPort";
-            Endpoint.publish(address, implementor);
+            final Endpoint endpoint = Endpoint.publish(address, implementor, 
new LoggingFeature());
+            ((SOAPBinding)endpoint.getBinding()).setMTOMEnabled(true);
         }
 
         public static void main(String[] args) {
@@ -115,69 +73,46 @@ public class AttachmentChunkingTest extends 
AbstractBusClientServerTestBase {
     public static void startServers() throws Exception {
         assertTrue("server did not launch correctly", 
launchServer(DownloadServer.class, true));
     }
-
+    
     @Test
-    public void testChunkingPartialFailure() {
+    public void testChunkingPartialEarlyFailure() throws IOException, 
DownloadFault_Exception {
         final JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
+        factory.setFeatures(Arrays.asList(new LoggingFeature()));
         factory.setServiceClass(Download.class);
 
         final Download client = (Download) factory.create();
         final BindingProvider bindingProvider = (BindingProvider) client;
         final Binding binding = bindingProvider.getBinding();
 
-        final String address = 
String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort";, PORT);
+        final String address = 
String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort";, 
getPort());
         
bindingProvider.getRequestContext().put("javax.xml.ws.service.endpoint.address",
 address);
         ((SOAPBinding) binding).setMTOMEnabled(true);
 
         // See please https://issues.apache.org/jira/browse/CXF-9057
-        SOAPFaultException ex = assertThrows(SOAPFaultException.class, () -> 
client.downloadNext(true));
-        assertThat(ex.getMessage(), containsString("simulated error during 
stream processing"));
+        assertThrows(WebServiceException.class, () -> client.downloadNext(1, 
true));
     }
-    
+
     @Test
-    public void testChunking() throws IOException, DownloadFault_Exception {
+    public void testChunkingPartialLateFailure() throws IOException, 
DownloadFault_Exception {
         final JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
+        factory.setFeatures(Arrays.asList(new LoggingFeature()));
         factory.setServiceClass(Download.class);
 
         final Download client = (Download) factory.create();
         final BindingProvider bindingProvider = (BindingProvider) client;
         final Binding binding = bindingProvider.getBinding();
 
-        final String address = 
String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort";, PORT);
+        final String address = 
String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort";, 
getPort());
         
bindingProvider.getRequestContext().put("javax.xml.ws.service.endpoint.address",
 address);
         ((SOAPBinding) binding).setMTOMEnabled(true);
 
-        final DownloadNextResponseType response = client.downloadNext(false);
-        try (InputStream is = response.getDataContent().getInputStream()) {
-            assertThat(IOUtils.readBytesFromStream(is).length, 
equalTo(100000));
-        }
-    }
-    
-    private static InputStream generate(int size) {
-        final byte[] buf = new byte[size];
-        Arrays.fill(buf, (byte) 'x');
-        return new ByteArrayInputStream(buf);
+        // See please https://issues.apache.org/jira/browse/CXF-9057
+        final DownloadNextResponseType response = client.downloadNext(10, 
true);
+        assertThrows(IOException.class, () -> 
IOUtils.readBytesFromStream(response.getDataContent().getInputStream()));
     }
-    
-    private static InputStream simulate() {
-        return new InputStream() {
-            @Override
-            public int read() {
-                return (byte) 'x';
-            }
 
-            @Override
-            public int read(byte[] b, int off, int len) {
-                if (ThreadLocalRandom.current().nextBoolean()) {
-                    throw new IllegalArgumentException("simulated error during 
stream processing");
-                }
-
-                for (int i = off; i < off + len; i++) {
-                    b[i] = (byte) 'x';
-                }
-
-                return len;
-            }
-        };
+    @Override
+    protected String getPort() {
+        return PORT;
     }
 }
diff --git a/systests/jaxws/src/test/resources/attachments/cxf9057.wsdl 
b/systests/jaxws/src/test/resources/attachments/cxf9057.wsdl
index 721fa08bb4..9a83f31a96 100644
--- a/systests/jaxws/src/test/resources/attachments/cxf9057.wsdl
+++ b/systests/jaxws/src/test/resources/attachments/cxf9057.wsdl
@@ -21,6 +21,7 @@
             </xs:complexType>
             <xs:complexType name="downloadNext">
                 <xs:sequence>
+                    <xs:element minOccurs="0" name="minNumberOfChunks" 
type="xs:int"/>
                     <xs:element minOccurs="0" name="simulate" 
type="xs:boolean"/>
                 </xs:sequence>
             </xs:complexType>

Reply via email to