This is an automated email from the ASF dual-hosted git repository. kwin pushed a commit to branch feature/jdk-transport-prevent-temp-file-for-put in repository https://gitbox.apache.org/repos/asf/maven-resolver.git
commit 9dd1308f01deec27615d70431b99b59031889ded Author: Konrad Windszus <[email protected]> AuthorDate: Fri Jan 16 18:36:27 2026 +0100 JDK Transport: Do no longer leverage temp file for transfering artifact with PUT Directly transfer based on the PutTask's InputStream. Improve retry handling to not retry (some) BodyPublisher exceptions. --- .../test/util/http/HttpTransporterTest.java | 5 +- .../aether/transport/jdk/JdkTransporter.java | 54 +++++++++--- .../jdk/TransportListenerAwareInputStream.java | 98 ++++++++++++++++++++++ 3 files changed, 144 insertions(+), 13 deletions(-) diff --git a/maven-resolver-test-http/src/main/java/org/eclipse/aether/internal/test/util/http/HttpTransporterTest.java b/maven-resolver-test-http/src/main/java/org/eclipse/aether/internal/test/util/http/HttpTransporterTest.java index 6f8e65294..e5db78b82 100644 --- a/maven-resolver-test-http/src/main/java/org/eclipse/aether/internal/test/util/http/HttpTransporterTest.java +++ b/maven-resolver-test-http/src/main/java/org/eclipse/aether/internal/test/util/http/HttpTransporterTest.java @@ -835,7 +835,8 @@ public class HttpTransporterTest { transporter.put(task); assertEquals(0L, listener.getDataOffset()); assertEquals(0L, listener.getDataLength()); - assertEquals(1, listener.getStartedCount()); + // some transports may skip the upload for empty resources + assertTrue(listener.getStartedCount() <= 1, "The transport should be started at most once but was started " + listener.getStartedCount() + " times"); assertEquals(0, listener.getProgressedCount()); assertEquals("", TestFileUtils.readString(new File(repoDir, "file.txt"))); } @@ -991,7 +992,7 @@ public class HttpTransporterTest { transporter.put(task); assertEquals(0L, listener.getDataOffset()); assertEquals(6L, listener.getDataLength()); - assertEquals(1, listener.getStartedCount()); + assertTrue(listener.getStartedCount() > 0 && listener.getStartedCount() <= 2, "Started count: " + listener.getStartedCount()); assertTrue(listener.getProgressedCount() > 0, "Count: " + listener.getProgressedCount()); assertEquals("upload", TestFileUtils.readString(new File(repoDir, "file.txt"))); } diff --git a/maven-resolver-transport-jdk-parent/maven-resolver-transport-jdk11/src/main/java/org/eclipse/aether/transport/jdk/JdkTransporter.java b/maven-resolver-transport-jdk-parent/maven-resolver-transport-jdk11/src/main/java/org/eclipse/aether/transport/jdk/JdkTransporter.java index f5b2b29a4..86dd559f6 100644 --- a/maven-resolver-transport-jdk-parent/maven-resolver-transport-jdk11/src/main/java/org/eclipse/aether/transport/jdk/JdkTransporter.java +++ b/maven-resolver-transport-jdk-parent/maven-resolver-transport-jdk11/src/main/java/org/eclipse/aether/transport/jdk/JdkTransporter.java @@ -29,6 +29,7 @@ import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; +import java.io.UncheckedIOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.Authenticator; @@ -61,6 +62,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.Semaphore; import java.util.function.Function; @@ -83,6 +85,7 @@ import org.eclipse.aether.spi.connector.transport.http.HttpTransporter; import org.eclipse.aether.spi.connector.transport.http.HttpTransporterException; import org.eclipse.aether.spi.io.PathProcessor; import org.eclipse.aether.transfer.NoTransporterException; +import org.eclipse.aether.transfer.TransferCancelledException; import org.eclipse.aether.util.ConfigUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -435,19 +438,30 @@ final class JdkTransporter extends AbstractTransporter implements HttpTransporte request = request.expectContinue(expectContinue); } headers.forEach(request::setHeader); - try (PathProcessor.TempFile tempFile = pathProcessor.newTempFile()) { - utilPut(task, Files.newOutputStream(tempFile.getPath()), true); - request.PUT(HttpRequest.BodyPublishers.ofFile(tempFile.getPath())); - prepare(request); + request.PUT(HttpRequest.BodyPublishers.ofInputStream(() -> { try { - HttpResponse<Void> response = send(request.build(), HttpResponse.BodyHandlers.discarding()); - if (response.statusCode() >= MULTIPLE_CHOICES) { - throw new HttpTransporterException(response.statusCode()); - } - } catch (ConnectException e) { - throw enhance(e); + return new TransportListenerAwareInputStream( + task.newInputStream(), task.getListener(), task.getDataLength()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + })); + prepare(request); + try { + HttpResponse<Void> response = send(request.build(), HttpResponse.BodyHandlers.discarding()); + if (response.statusCode() >= MULTIPLE_CHOICES) { + throw new HttpTransporterException(response.statusCode()); } + } catch (ConnectException e) { + throw enhance(e); + } catch (IOException e) { + // unwrap possible underlying exception from body supplier + Throwable rootCause = getRootCause(e); + if (rootCause instanceof TransferCancelledException) { + throw (TransferCancelledException) rootCause; + } + throw e; } } @@ -650,7 +664,16 @@ final class JdkTransporter extends AbstractTransporter implements HttpTransporte // e.g. for connection timeouts this is hardcoded to 2 attempts: // https://github.com/openjdk/jdk/blob/640343f7d94894b0378ea5b1768eeac203a9aaf8/src/java.net.http/share/classes/jdk/internal/net/http/MultiExchange.java#L665 .maxRetries(retryCount) - .onException(t -> t instanceof IOException && !NON_RETRIABLE_IO_EXCEPTIONS.contains(t.getClass())) + .onException(t -> { + // exceptions from body publishers are wrapped inside IOExceptions + // but hard to distinguish from others, so just exclude some we know are emitted from body + // suppliers (https://github.com/mizosoft/methanol/issues/179) + Throwable rootCause = getRootCause(t); + // TODO: unwrap uncheckedIOException due to https://bugs.openjdk.org/browse/JDK-8367067 + return t instanceof IOException + && !NON_RETRIABLE_IO_EXCEPTIONS.contains(t.getClass()) + && !(rootCause instanceof TransferCancelledException); + }) .build(); builder.interceptor(retryIoExceptionsInterceptor); } @@ -694,4 +717,13 @@ final class JdkTransporter extends AbstractTransporter implements HttpTransporte throw new IllegalStateException(e); } } + + private static Throwable getRootCause(Throwable throwable) { + Objects.requireNonNull(throwable); + Throwable rootCause = throwable; + while (rootCause.getCause() != null && rootCause.getCause() != rootCause) { + rootCause = rootCause.getCause(); + } + return rootCause; + } } diff --git a/maven-resolver-transport-jdk-parent/maven-resolver-transport-jdk11/src/main/java/org/eclipse/aether/transport/jdk/TransportListenerAwareInputStream.java b/maven-resolver-transport-jdk-parent/maven-resolver-transport-jdk11/src/main/java/org/eclipse/aether/transport/jdk/TransportListenerAwareInputStream.java new file mode 100644 index 000000000..adf4b3cb8 --- /dev/null +++ b/maven-resolver-transport-jdk-parent/maven-resolver-transport-jdk11/src/main/java/org/eclipse/aether/transport/jdk/TransportListenerAwareInputStream.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.eclipse.aether.transport.jdk; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; + +import org.eclipse.aether.spi.connector.transport.TransportListener; +import org.eclipse.aether.transfer.TransferCancelledException; + +/** + * An InputStream wrapper that notifies a {@link TransportListener} about progress when data is read. + * It throws {@link InterruptedIOException} with a cause of {@link TransferCancelledException} when the transfer is cancelled in the transport listener. + */ +public class TransportListenerAwareInputStream extends FilterInputStream { + + private final TransportListener transportListener; + private final long size; + private boolean isStarted = false; + + protected TransportListenerAwareInputStream(InputStream in, TransportListener transportListener, long size) { + super(in); + this.transportListener = transportListener; + this.size = size; + } + + @Override + public int read() throws IOException { + int byteRead = super.read(); + if (byteRead != -1) { + if (!isStarted) { + notifyStarted(); + } + notifyProgress(new byte[] {(byte) byteRead}, 0, 1); + } + return byteRead; + } + + @Override + public int read(byte[] b) throws IOException { + int numBytesRead = super.read(b); + if (numBytesRead != -1) { + if (!isStarted) { + notifyStarted(); + } + notifyProgress(b, 0, numBytesRead); + } + return super.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int numBytesRead = super.read(b, off, len); + if (numBytesRead != -1) { + if (!isStarted) { + notifyStarted(); + } + notifyProgress(b, off, numBytesRead); + } + return numBytesRead; + } + + private void notifyProgress(byte[] buffer, int offset, int numBytesRead) throws IOException { + try { + transportListener.transportProgressed(ByteBuffer.wrap(buffer, offset, numBytesRead)); + } catch (TransferCancelledException e) { + throw (IOException) new InterruptedIOException().initCause(e); + } + } + + private void notifyStarted() throws IOException { + try { + transportListener.transportStarted(0, size); + } catch (TransferCancelledException e) { + throw (IOException) new InterruptedIOException().initCause(e); + } + isStarted = true; + } +}
