This is an automated email from the ASF dual-hosted git repository. dkulp pushed a commit to branch 3.6.x-fixes in repository https://gitbox.apache.org/repos/asf/cxf.git
commit 2923b3324db22481b8f2b3868c3c66ddff29b222 Author: Daniel Kulp <d...@kulp.com> AuthorDate: Tue Sep 5 10:59:52 2023 -0400 Handle empty http responses better to make sure they are marked as complete (cherry picked from commit 48b89768082a31802f65ec9c5985ca7b7b5c0988) --- .../cxf/transport/http/HttpClientHTTPConduit.java | 91 +++++++++++++--------- 1 file changed, 55 insertions(+), 36 deletions(-) diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java index 9012f3a047..4cab8ae253 100644 --- a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java +++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java @@ -342,7 +342,6 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { int rtimeout; volatile Throwable exception; volatile boolean connectionComplete; - PipedInputStream pin; PipedOutputStream pout; HttpRequest request; @@ -367,7 +366,9 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { @Override protected void handleNoOutput() throws IOException { contentLen = 0; - pout.close(); + if (pout != null) { + pout.close(); + } if (exception != null) { if (exception instanceof IOException) { throw (IOException)exception; @@ -449,44 +450,50 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { String httpRequestMethod = (String)outMessage.get(Message.HTTP_REQUEST_METHOD); - pin = new PipedInputStream(csPolicy.getChunkLength() <= 0 - ? 4096 : csPolicy.getChunkLength()); - pout = new PipedOutputStream(pin) { - synchronized boolean canWrite() throws IOException { - return isConnectionAttemptCompleted(csPolicy, this); - } - @Override - public void write(int b) throws IOException { - if (connectionComplete || canWrite()) { - super.write(b); - } - } - @Override - public void write(byte[] b, int off, int len) throws IOException { - if (connectionComplete || canWrite()) { - super.write(b, off, len); - } - } - - }; if (KNOWN_HTTP_VERBS_WITH_NO_CONTENT.contains(httpRequestMethod) || PropertyUtils.isTrue(outMessage.get(Headers.EMPTY_REQUEST_PROPERTY))) { contentLen = 0; } + final PipedInputStream pin = new PipedInputStream(csPolicy.getChunkLength() <= 0 + ? 4096 : csPolicy.getChunkLength()); + if (contentLen != 0) { + pout = new PipedOutputStream(pin) { + synchronized boolean canWrite() throws IOException { + return isConnectionAttemptCompleted(csPolicy, this); + } + @Override + public void write(int b) throws IOException { + if (connectionComplete || canWrite()) { + super.write(b); + } + } + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (connectionComplete || canWrite()) { + super.write(b, off, len); + } + } + }; + } + BodyPublisher bp = new BodyPublisher() { @Override public void subscribe(Subscriber<? super ByteBuffer> subscriber) { connectionComplete = true; - synchronized (pout) { - pout.notifyAll(); + if (pout != null) { + synchronized (pout) { + pout.notifyAll(); + } + BodyPublishers.ofInputStream(new Supplier<InputStream>() { + public InputStream get() { + return pin; + } + }).subscribe(subscriber); + } else { + BodyPublishers.noBody().subscribe(subscriber); } - BodyPublishers.ofInputStream(new Supplier<InputStream>() { - public InputStream get() { - return pin; - } - }).subscribe(subscriber); } @Override @@ -526,8 +533,10 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { future = cl.sendAsync(request, handler); future.exceptionally(ex -> { - synchronized (pout) { - pout.notifyAll(); + if (pout != null) { + synchronized (pout) { + pout.notifyAll(); + } } return null; }); @@ -611,7 +620,9 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { String method = (String)outMessage.get(Message.HTTP_REQUEST_METHOD); int sc = resp.statusCode(); if ("HEAD".equals(method)) { - return null; + try (InputStream in = resp.body()) { + return null; + } } if (sc == 204) { //no content @@ -623,16 +634,22 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { if (f.isPresent()) { long l = Long.parseLong(f.get()); if (l == 0) { - return null; + try (InputStream in = resp.body()) { + return null; + } } } else if (!fChunk.isPresent() || !"chunked".equals(fChunk.get())) { if (resp.version() == Version.HTTP_2) { InputStream in = resp.body(); if (in.available() <= 0) { - return null; + try (in) { + return null; + } } } else { - return null; + try (InputStream in = resp.body()) { + return null; + } } } } @@ -798,7 +815,9 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { @Override protected void retransmitStream() throws IOException { cachedStream.writeCacheTo(pout); - pout.close(); + if (pout != null) { + pout.close(); + } } @Override