[
https://issues.apache.org/jira/browse/SSHD-966?focusedWorklogId=761516&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-761516
]
ASF GitHub Bot logged work on SSHD-966:
---------------------------------------
Author: ASF GitHub Bot
Created on: 24/Apr/22 21:04
Start Date: 24/Apr/22 21:04
Worklog Time Spent: 10m
Work Description: tomaswolf commented on code in PR #217:
URL: https://github.com/apache/mina-sshd/pull/217#discussion_r857179032
##########
sshd-core/src/main/java/org/apache/sshd/common/session/helpers/KeyExchangeMessageHandler.java:
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.session.helpers;
+
+import java.io.IOException;
+import java.net.ProtocolException;
+import java.security.GeneralSecurityException;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.future.DefaultKeyExchangeFuture;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.kex.KexState;
+import org.apache.sshd.common.util.ExceptionUtils;
+import org.apache.sshd.common.util.ValidateUtils;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.threads.ThreadUtils;
+import org.slf4j.Logger;
+
+/**
+ * Manages SSH message sending during a key exchange. RFC 4253 specifies that
during a key exchange, no high-level
+ * messages are to be sent, but a receiver must be able to deal with messages
"in flight" until the peer's
+ * {@link SshConstants#SSH_MSG_KEX_INIT} message is received.
+ * <p>
+ * Apache MINA sshd queues up high-level messages that threads try to send
while a key exchange is ongoing, and sends
+ * them once the key exchange is done. Sending queued messages may make the
peer re-trigger a new key exchange, in which
+ * case sending queued messages stops and is resumed at the end of the new key
exchange.
+ * </p>
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc4253#section-7">RFC 4253</a>
+ */
+public class KeyExchangeMessageHandler {
+
+ // With asynchronous flushing we get a classic producer-consumer problem.
The flushing thread is the single
+ // consumer, and there is a risk that it might get overrun by the
producers. The classical solution of using a
+ // LinkedBlockingQueue with a fixed maximum capacity doesn't work: we
cannot make the producers block when the queue
+ // is full; we might deadlock or be unable to handle any incoming message.
+ //
+ // We need an unbounded queue that never blocks the producers, but that
manages to throttle them such that the
+ // flushing thread can actually finish, and we still can handle incoming
messages (in particular also the peer's
+ // SSH_MSG_NEW_KEYS, since we start flushing already after having sent our
own SSH_MSG_NEW_KEYS).
+ //
+ // This is achieved by giving the flushing thread priority over the
threads that might enqueue additional packets
+ // and flushing at least two packets at a time. Additionally the flush
loop releases and shortly afterwards
+ // re-acquires the write lock, so normally not many readers (i.e.,
writePacket() calls) will get a chance to enqueue
+ // new packets.
+
+ /**
+ * We need the flushing thread to have priority over writing threads. So
we use a lock that favors writers over
+ * readers, and any state updates and the flushing thread are writers,
while writePacket() is a reader.
+ */
+ private final ReentrantReadWriteLock lock = new
ReentrantReadWriteLock(false);
Review Comment:
Done.
##########
sshd-core/src/main/java/org/apache/sshd/common/session/helpers/KeyExchangeMessageHandler.java:
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.session.helpers;
+
+import java.io.IOException;
+import java.net.ProtocolException;
+import java.security.GeneralSecurityException;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.future.DefaultKeyExchangeFuture;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.kex.KexState;
+import org.apache.sshd.common.util.ExceptionUtils;
+import org.apache.sshd.common.util.ValidateUtils;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.threads.ThreadUtils;
+import org.slf4j.Logger;
+
+/**
+ * Manages SSH message sending during a key exchange. RFC 4253 specifies that
during a key exchange, no high-level
+ * messages are to be sent, but a receiver must be able to deal with messages
"in flight" until the peer's
+ * {@link SshConstants#SSH_MSG_KEX_INIT} message is received.
+ * <p>
+ * Apache MINA sshd queues up high-level messages that threads try to send
while a key exchange is ongoing, and sends
+ * them once the key exchange is done. Sending queued messages may make the
peer re-trigger a new key exchange, in which
+ * case sending queued messages stops and is resumed at the end of the new key
exchange.
+ * </p>
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc4253#section-7">RFC 4253</a>
+ */
+public class KeyExchangeMessageHandler {
+
+ // With asynchronous flushing we get a classic producer-consumer problem.
The flushing thread is the single
+ // consumer, and there is a risk that it might get overrun by the
producers. The classical solution of using a
+ // LinkedBlockingQueue with a fixed maximum capacity doesn't work: we
cannot make the producers block when the queue
+ // is full; we might deadlock or be unable to handle any incoming message.
+ //
+ // We need an unbounded queue that never blocks the producers, but that
manages to throttle them such that the
+ // flushing thread can actually finish, and we still can handle incoming
messages (in particular also the peer's
+ // SSH_MSG_NEW_KEYS, since we start flushing already after having sent our
own SSH_MSG_NEW_KEYS).
+ //
+ // This is achieved by giving the flushing thread priority over the
threads that might enqueue additional packets
+ // and flushing at least two packets at a time. Additionally the flush
loop releases and shortly afterwards
+ // re-acquires the write lock, so normally not many readers (i.e.,
writePacket() calls) will get a chance to enqueue
+ // new packets.
+
+ /**
+ * We need the flushing thread to have priority over writing threads. So
we use a lock that favors writers over
+ * readers, and any state updates and the flushing thread are writers,
while writePacket() is a reader.
+ */
+ private final ReentrantReadWriteLock lock = new
ReentrantReadWriteLock(false);
+
+ private final ExecutorService flushRunner =
Executors.newSingleThreadExecutor();
+
+ private final AbstractSession session;
+
+ private final Logger log;
+
+ /**
+ * Queues up high-level packets written during an ongoing key exchange.
+ */
+ private final Queue<PendingWriteFuture> pendingPackets = new
LinkedList<>();
+
+ /**
+ * Indicates that all pending packets have been flushed.
+ */
+ private boolean kexFlushed = true;
+
+ /**
+ * Never {@code null}. Used to block some threads when writing packets
while pending packets are still being flushed
+ * at the end of a KEX to avoid overrunning the flushing thread. Always
set, initially fulfilled. At the beginning
+ * of a KEX a new future is installed, which is fulfilled at the end of
the KEX once there are no more pending
+ * packets to be flushed.
+ */
+ private DefaultKeyExchangeFuture kexFlushedFuture;
+
+ /**
+ * Creates a new {@link KeyExchangeMessageHandler} for the given {@code
session}, using the given {@code Logger}.
+ *
+ * @param session {@link AbstractSession} the new instance belongs to
+ * @param log {@link Logger} to use for writing log messages
+ */
+ public KeyExchangeMessageHandler(AbstractSession session, Logger log) {
+ this.session = Objects.requireNonNull(session);
+ this.log = Objects.requireNonNull(log);
+ // Start with a fulfilled kexFlushed future.
+ kexFlushedFuture = new DefaultKeyExchangeFuture(session.toString(),
session.getFutureLock());
+ kexFlushedFuture.setValue(Boolean.TRUE);
+ }
+
+ public void updateState(Runnable update) {
+ lock.writeLock().lock();
+ try {
+ update.run();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public <V> V updateState(Supplier<V> update) {
+ lock.writeLock().lock();
+ try {
+ return update.get();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Initializes the state for a new key exchange. {@link
#allPacketsFlushed()} will be {@code false}, and a new
+ * future to be fulfilled when all queued packets will be flushed once the
key exchange is done is set. The
+ * currently set future from an earlier key exchange is returned. The
returned future may or may not be fulfilled;
+ * if it isn't, there are still left-over pending packets to write from
the previous key exchange, which will be
+ * written once the new key exchange flushes pending packets.
+ *
+ * @return the previous {@link DefaultKeyExchangeFuture} indicating
whether all pending packets were flushed.
+ */
+ public DefaultKeyExchangeFuture initNewKeyExchange() {
+ return updateState(() -> {
+ kexFlushed = false;
+ DefaultKeyExchangeFuture oldFuture = kexFlushedFuture;
+ kexFlushedFuture = new
DefaultKeyExchangeFuture(session.toString(), session.getFutureLock());
+ return oldFuture;
+ });
+ }
+
+ /**
+ * To be called when the key exchange is done. If there are any pending
packets, returns a future that will be
+ * fulfilled when {@link #flushQueue(DefaultKeyExchangeFuture)} with that
future as argument has flushed all pending
+ * packets, if there are any.
+ *
+ * @return the current {@link DefaultKeyExchangeFuture} and the number of
currently pending packets
+ */
+ public SimpleImmutableEntry<Integer, DefaultKeyExchangeFuture>
terminateKeyExchange() {
+ return updateState(() -> {
+ int numPending = pendingPackets.size();
+ if (numPending == 0) {
+ kexFlushed = true;
+ }
+ return new SimpleImmutableEntry<Integer,
DefaultKeyExchangeFuture>(Integer.valueOf(numPending), kexFlushedFuture);
+ });
+ }
+
+ /**
+ * Pretends all pending packets had been written. To be called when the
{@link AbstractSession} closes.
+ */
+ public void shutdown() {
+ SimpleImmutableEntry<Integer, DefaultKeyExchangeFuture> items =
updateState(() -> {
+ kexFlushed = true;
+ return new SimpleImmutableEntry<Integer, DefaultKeyExchangeFuture>(
+ Integer.valueOf(pendingPackets.size()),
+ kexFlushedFuture);
+ });
+ items.getValue().setValue(Boolean.valueOf(items.getKey().intValue() ==
0));
+ flushRunner.shutdownNow();
+ }
+
+ /**
+ * Writes a packet. If a key exchange is ongoing, only low-level messages
are written directly; all other messages
+ * are queued and will be written once {@link
#flushQueue(DefaultKeyExchangeFuture)} is called when the key exchange
+ * is done. Packets written while there are still pending packets to be
flushed will either be queued, too, or the
+ * calling thread will be blocked with the given timeout until all packets
have been flushed. Whether a write will
+ * be blocked is determined by {@link #isBlockAllowed(int)}.
+ * <p>
+ * If {@code timeout <= 0} or {@code unit == null}, a time-out of
"forever" is assumed. Note that a timeout applies
+ * only if the calling thread is blocked.
+ * </p>
+ *
+ * @param buffer packet to write
+ * @param timeout number of {@link TimeUnit}s to wait at most if the
calling thread is blocked
+ * @param unit {@link TimeUnit} of {@code timeout}
+ * @return an {@link IoWriteFuture} that will be fulfilled
once the packet has indeed been written.
+ * @throws IOException if an error occurs
+ */
+ public IoWriteFuture writePacket(Buffer buffer, long timeout, TimeUnit
unit) throws IOException {
+ // While exchanging key, queue high level packets.
+ byte[] bufData = buffer.array();
+ int cmd = bufData[buffer.rpos()] & 0xFF;
+ boolean enqueued = false;
+ boolean isLowLevelMessage = cmd <= SshConstants.SSH_MSG_KEX_LAST &&
cmd != SshConstants.SSH_MSG_SERVICE_REQUEST
+ && cmd != SshConstants.SSH_MSG_SERVICE_ACCEPT;
+ try {
+ if (isLowLevelMessage) {
+ // Low-level messages can always be sent.
+ return session.doWritePacket(buffer);
+ }
+ IoWriteFuture future = writeOrEnqueue(cmd, buffer, timeout, unit);
+ enqueued = future instanceof PendingWriteFuture;
+ return future;
+ } finally {
+ session.resetIdleTimeout();
+ if (!enqueued) {
+ try {
+ session.checkRekey();
+ } catch (GeneralSecurityException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("writePacket({}) failed ({}) to check
re-key: {}", session, e.getClass().getSimpleName(),
+ e.getMessage(), e);
+ }
+ throw ValidateUtils.initializeExceptionCause(new
ProtocolException(
+ "Failed (" + e.getClass().getSimpleName() + ")" +
" to check re-key necessity: " + e.getMessage()),
+ e);
+ } catch (Exception e) {
+ ExceptionUtils.rethrowAsIoException(e);
+ }
+ }
+ }
+ }
+
+ private IoWriteFuture writeOrEnqueue(int cmd, Buffer buffer, long timeout,
TimeUnit unit) throws IOException {
Review Comment:
Done.
Issue Time Tracking
-------------------
Worklog Id: (was: 761516)
Time Spent: 5h 20m (was: 5h 10m)
> Deadlock on disconnection at the end of key-exchange
> ----------------------------------------------------
>
> Key: SSHD-966
> URL: https://issues.apache.org/jira/browse/SSHD-966
> Project: MINA SSHD
> Issue Type: Bug
> Affects Versions: 2.0.0
> Reporter: Francois Ferrand
> Assignee: Lyor Goldstein
> Priority: Major
> Time Spent: 5h 20m
> Remaining Estimate: 0h
>
> We are using git-repo to download projects from Gerrit server, using SSH.
> Gerrit is in version 2.16.16. which uses SSHD 2.0.0 and Mina 2.0.17 with NIO2
> backend.
> One particularity of this setup is that git-repo creates a single control
> master channel, and then downloads *lots* of Git repositories (500
> repositories, some of them relatively large), with some degree of
> parallelism. This takes a long time, lots of data, and the multiplexed
> connections are handled by gerrit in multiple threads.
> In some cases, we experience a deadlock when an error happens at the end of
> the key exchange, while sending pending packets:
> {noformat}
> Warning, the following threads are deadlocked : SSH git-upload-pack /project1
> (myuser), sshd-SshServer[df5f657]-nio2-thread-3
> "SSH git-upload-pack /project1 (myuser)" prio=1 BLOCKED
>
> org.apache.sshd.common.session.helpers.AbstractSession.writePacket(AbstractSession.java:1107)
>
> org.apache.sshd.common.channel.AbstractChannel.writePacket(AbstractChannel.java:798)
>
> org.apache.sshd.common.channel.ChannelOutputStream.flush(ChannelOutputStream.java:227)
>
> org.apache.sshd.common.channel.ChannelOutputStream.write(ChannelOutputStream.java:127)
>
> org.eclipse.jgit.transport.UploadPack$ResponseBufferedOutputStream.write(UploadPack.java:2183)
>
> org.eclipse.jgit.transport.SideBandOutputStream.writeBuffer(SideBandOutputStream.java:174)
>
> org.eclipse.jgit.transport.SideBandOutputStream.write(SideBandOutputStream.java:153)
>
> org.eclipse.jgit.internal.storage.pack.PackOutputStream.write(PackOutputStream.java:132)
>
> org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs2(PackFile.java:614)
>
> org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs(PackFile.java:433)
>
> org.eclipse.jgit.internal.storage.file.WindowCursor.copyObjectAsIs(WindowCursor.java:221)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjectImpl(PackWriter.java:1644)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObject(PackWriter.java:1621)
>
> org.eclipse.jgit.internal.storage.pack.PackOutputStream.writeObject(PackOutputStream.java:171)
>
> org.eclipse.jgit.internal.storage.file.WindowCursor.writeObjects(WindowCursor.java:229)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1609)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1597)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writePack(PackWriter.java:1154)
> org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:2133)
> org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:1947)
> org.eclipse.jgit.transport.UploadPack.service(UploadPack.java:971)
> org.eclipse.jgit.transport.UploadPack.upload(UploadPack.java:776)
> com.google.gerrit.sshd.commands.Upload.runImpl(Upload.java:77)
>
> com.google.gerrit.sshd.AbstractGitCommand.service(AbstractGitCommand.java:98)
>
> com.google.gerrit.sshd.AbstractGitCommand.access$000(AbstractGitCommand.java:31)
>
> com.google.gerrit.sshd.AbstractGitCommand$1.run(AbstractGitCommand.java:63)
> com.google.gerrit.sshd.BaseCommand$TaskThunk.run(BaseCommand.java:467)
>
> com.google.gerrit.server.logging.LoggingContextAwareRunnable.run(LoggingContextAwareRunnable.java:83)
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> com.google.gerrit.server.git.WorkQueue$Task.run(WorkQueue.java:646)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> java.lang.Thread.run(Thread.java:748)
> "sshd-SshServer[df5f657]-nio2-thread-3" daemon prio=5 BLOCKED
>
> org.apache.sshd.common.channel.ChannelOutputStream.close(ChannelOutputStream.java:249)
> org.apache.sshd.common.util.io.IoUtils.closeQuietly(IoUtils.java:151)
>
> org.apache.sshd.server.channel.ChannelSession.doCloseImmediately(ChannelSession.java:205)
>
> org.apache.sshd.common.util.closeable.AbstractCloseable.close(AbstractCloseable.java:83)
>
> org.apache.sshd.common.channel.AbstractChannel.close(AbstractChannel.java:576)
>
> org.apache.sshd.common.util.closeable.ParallelCloseable.doClose(ParallelCloseable.java:65)
>
> org.apache.sshd.common.util.closeable.SimpleCloseable.close(SimpleCloseable.java:63)
>
> org.apache.sshd.common.util.closeable.AbstractInnerCloseable.doCloseImmediately(AbstractInnerCloseable.java:46)
>
> org.apache.sshd.common.util.closeable.AbstractCloseable.close(AbstractCloseable.java:83)
>
> org.apache.sshd.common.util.closeable.ParallelCloseable.doClose(ParallelCloseable.java:65)
>
> org.apache.sshd.common.util.closeable.SimpleCloseable.close(SimpleCloseable.java:63)
>
> org.apache.sshd.common.util.closeable.SequentialCloseable$1.operationComplete(SequentialCloseable.java:56)
>
> org.apache.sshd.common.util.closeable.SequentialCloseable$1.operationComplete(SequentialCloseable.java:45)
>
> org.apache.sshd.common.util.closeable.SequentialCloseable.doClose(SequentialCloseable.java:69)
>
> org.apache.sshd.common.util.closeable.SimpleCloseable.close(SimpleCloseable.java:63)
>
> org.apache.sshd.common.util.closeable.AbstractInnerCloseable.doCloseImmediately(AbstractInnerCloseable.java:46)
>
> org.apache.sshd.common.util.closeable.AbstractCloseable.close(AbstractCloseable.java:83)
>
> org.apache.sshd.common.session.helpers.AbstractSession.exceptionCaught(AbstractSession.java:973)
>
> org.apache.sshd.common.session.helpers.AbstractSessionIoHandler.exceptionCaught(AbstractSessionIoHandler.java:53)
>
> org.apache.sshd.common.io.nio2.Nio2Session.exceptionCaught(Nio2Session.java:186)
>
> org.apache.sshd.common.io.nio2.Nio2Session.handleWriteCycleFailure(Nio2Session.java:460)
>
> org.apache.sshd.common.io.nio2.Nio2Session$2.onFailed(Nio2Session.java:415)
>
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler.lambda$failed$1(Nio2CompletionHandler.java:46)
>
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler$$Lambda$627/1874891543.run(Unknown
> Source)
> java.security.AccessController.doPrivileged(Native Method)
>
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler.failed(Nio2CompletionHandler.java:45)
> sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:128)
> sun.nio.ch.Invoker.invokeDirect(Invoker.java:157)
>
> sun.nio.ch.UnixAsynchronousSocketChannelImpl.implWrite(UnixAsynchronousSocketChannelImpl.java:736)
>
> sun.nio.ch.AsynchronousSocketChannelImpl.write(AsynchronousSocketChannelImpl.java:382)
>
> sun.nio.ch.AsynchronousSocketChannelImpl.write(AsynchronousSocketChannelImpl.java:399)
>
> org.apache.sshd.common.io.nio2.Nio2Session.doWriteCycle(Nio2Session.java:401)
>
> org.apache.sshd.common.io.nio2.Nio2Session.startWriting(Nio2Session.java:386)
>
> org.apache.sshd.common.io.nio2.Nio2Session.writePacket(Nio2Session.java:169)
>
> org.apache.sshd.common.session.helpers.AbstractSession.doWritePacket(AbstractSession.java:1181)
>
> org.apache.sshd.common.session.helpers.AbstractSession.sendPendingPackets(AbstractSession.java:910)
>
> org.apache.sshd.common.session.helpers.AbstractSession.handleNewKeys(AbstractSession.java:875)
>
> org.apache.sshd.common.session.helpers.AbstractSession.doHandleMessage(AbstractSession.java:606)
>
> org.apache.sshd.common.session.helpers.AbstractSession.handleMessage(AbstractSession.java:555)
>
> org.apache.sshd.common.session.helpers.AbstractSession.decode(AbstractSession.java:1527)
>
> org.apache.sshd.common.session.helpers.AbstractSession.messageReceived(AbstractSession.java:516)
>
> org.apache.sshd.common.session.helpers.AbstractSessionIoHandler.messageReceived(AbstractSessionIoHandler.java:63)
>
> org.apache.sshd.common.io.nio2.Nio2Session.handleReadCycleCompletion(Nio2Session.java:339)
>
> org.apache.sshd.common.io.nio2.Nio2Session$1.onCompleted(Nio2Session.java:318)
>
> org.apache.sshd.common.io.nio2.Nio2Session$1.onCompleted(Nio2Session.java:315)
>
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler.lambda$completed$0(Nio2CompletionHandler.java:38)
>
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler$$Lambda$349/1690110070.run(Unknown
> Source)
> java.security.AccessController.doPrivileged(Native Method)
>
> org.apache.sshd.common.io.nio2.Nio2CompletionHandler.completed(Nio2CompletionHandler.java:37)
> sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126)
> sun.nio.ch.Invoker$2.run(Invoker.java:218)
>
> sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> java.lang.Thread.run(Thread.java:748)
> "SSH git-upload-pack /project2 (myuser)" prio=1 BLOCKED
>
> org.apache.sshd.common.session.helpers.AbstractSession.writePacket(AbstractSession.java:1107)
>
> org.apache.sshd.common.channel.AbstractChannel.writePacket(AbstractChannel.java:798)
>
> org.apache.sshd.common.channel.ChannelOutputStream.flush(ChannelOutputStream.java:227)
>
> org.apache.sshd.common.channel.ChannelOutputStream.write(ChannelOutputStream.java:127)
>
> org.eclipse.jgit.transport.UploadPack$ResponseBufferedOutputStream.write(UploadPack.java:2183)
>
> org.eclipse.jgit.transport.SideBandOutputStream.writeBuffer(SideBandOutputStream.java:174)
>
> org.eclipse.jgit.transport.SideBandOutputStream.write(SideBandOutputStream.java:153)
>
> org.eclipse.jgit.internal.storage.pack.PackOutputStream.write(PackOutputStream.java:132)
>
> org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs2(PackFile.java:614)
>
> org.eclipse.jgit.internal.storage.file.PackFile.copyAsIs(PackFile.java:433)
>
> org.eclipse.jgit.internal.storage.file.WindowCursor.copyObjectAsIs(WindowCursor.java:221)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjectImpl(PackWriter.java:1644)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObject(PackWriter.java:1621)
>
> org.eclipse.jgit.internal.storage.pack.PackOutputStream.writeObject(PackOutputStream.java:171)
>
> org.eclipse.jgit.internal.storage.file.WindowCursor.writeObjects(WindowCursor.java:229)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1609)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writeObjects(PackWriter.java:1597)
>
> org.eclipse.jgit.internal.storage.pack.PackWriter.writePack(PackWriter.java:1154)
> org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:2133)
> org.eclipse.jgit.transport.UploadPack.sendPack(UploadPack.java:1947)
> org.eclipse.jgit.transport.UploadPack.service(UploadPack.java:971)
> org.eclipse.jgit.transport.UploadPack.upload(UploadPack.java:776)
> com.google.gerrit.sshd.commands.Upload.runImpl(Upload.java:77)
>
> com.google.gerrit.sshd.AbstractGitCommand.service(AbstractGitCommand.java:98)
>
> com.google.gerrit.sshd.AbstractGitCommand.access$000(AbstractGitCommand.java:31)
>
> com.google.gerrit.sshd.AbstractGitCommand$1.run(AbstractGitCommand.java:63)
> com.google.gerrit.sshd.BaseCommand$TaskThunk.run(BaseCommand.java:467)
>
> com.google.gerrit.server.logging.LoggingContextAwareRunnable.run(LoggingContextAwareRunnable.java:83)
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> com.google.gerrit.server.git.WorkQueue$Task.run(WorkQueue.java:646)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> java.lang.Thread.run(Thread.java:748)
> {noformat}
> In AbstractSession.handleNewKeys(), there is a lock on {{pendingEvents}},
> while trying to send these packets; when sending the packets fails, an
> exception is generated, which causes the channel to get closed by calling
> ChannelOutputStream.close(), which is a synchronized method.
> At the same time, some other threads are trying to write data to the session:
> it calls ChannelOutputStream.write(), which is also a synchronized method,
> which calls AbstractSession.writePacket() which attempts to lock
> {{pendingPackets}} to queue the packets.
> *NOTE*: in our setup, we can reproduce this quite easily by increasing the
> parallelism in git-repo, reducing the values of waitTimeout
> (WAIT_FOR_SPACE_TIMEOUT), rekeyBytesLimit (REKEY_TIME_LIMIT) and
> rekeyTimeLimit (REKEY_TIME_LIMIT) configuration options.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]