Alon Bar-Lev has uploaded a new change for review. Change subject: host-deploy: SSHDialog: close streams after sink is stopped ......................................................................
host-deploy: SSHDialog: close streams after sink is stopped this solves Pipe Closed exception happens because of try-with-resources that was introduced at main thread, this closed the input stream before the reader thread could terminate properly. this also reverts 4d544a99, as it happens from same root cause, and I read the java sources incorrectly, the buffered input stream uses byte-by-byte read and not buffered. add unit test to reproduce this issue. Bug-Url: https://bugzilla.redhat.com/show_bug.cgi?id=1084911 Change-Id: I180a9883eefac57c9b528ff7b6df0eda11149255 Signed-off-by: Alon Bar-Lev <[email protected]> --- M backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/OVirtNodeUpgrade.java M backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/ssh/SSHDialog.java M backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/ssh/SSHDialogTest.java 3 files changed, 167 insertions(+), 75 deletions(-) git pull ssh://gerrit.ovirt.org:29418/ovirt-engine refs/changes/87/26587/1 diff --git a/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/OVirtNodeUpgrade.java b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/OVirtNodeUpgrade.java index 4b05c34..e6b4d2d 100644 --- a/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/OVirtNodeUpgrade.java +++ b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/OVirtNodeUpgrade.java @@ -28,7 +28,6 @@ private static final int BUFFER_SIZE = 10 * 1024; private static final int THREAD_JOIN_TIMEOUT = 20 * 1000; - private static final String ARTIFICIAL_EOF = "component='RHEV_INSTALL'"; private static final Log log = LogFactory.getLog(OVirtNodeUpgrade.class); @@ -53,29 +52,12 @@ boolean error = false; try { String line; - boolean eof = false; while ( - !eof && _incoming != null && (line = _incoming.readLine()) != null ) { log.infoFormat("update from host {0}: {1}", _vds.getHostName(), line); error = _messages.postOldXmlFormat(line) || error; - - /* - * apply artificial end-of-file - * - * there is no way in java to use non blocking - * InputStream. - * - * there is no way to interrupt the blocking. - * - * and we cannot terminate thread as the connection - * pool will get messy. - * - * so we should detect eof based on data. - */ - eof = line.contains(ARTIFICIAL_EOF); } if (error) { diff --git a/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/ssh/SSHDialog.java b/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/ssh/SSHDialog.java index 98959f2..4abab39 100644 --- a/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/ssh/SSHDialog.java +++ b/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/ssh/SSHDialog.java @@ -291,71 +291,73 @@ final OutputStream poutStdout = new PipedOutputStream(pinStdout); final ByteArrayOutputStream stderr = new ConstraintByteArrayOutputStream(1024); ) { - List<InputStream> stdinList; - if (initial == null) { - stdinList = new LinkedList<InputStream>(); - } - else { - stdinList = new LinkedList<InputStream>(Arrays.asList(initial)); - } - stdinList.add(pinStdin); + try { + List<InputStream> stdinList; + if (initial == null) { + stdinList = new LinkedList<InputStream>(); + } + else { + stdinList = new LinkedList<InputStream>(Arrays.asList(initial)); + } + stdinList.add(pinStdin); - sink.setControl( - new Control() { - @Override - public void close() throws IOException { - if (_client != null) { - _client.close(); + sink.setControl( + new Control() { + @Override + public void close() throws IOException { + if (_client != null) { + _client.close(); + } } } - } - ); - sink.setStreams(pinStdout, poutStdin); - sink.start(); - - try { - _client.executeCommand( - command, - new SequenceInputStream(Collections.enumeration(stdinList)), - poutStdout, - stderr ); - } - catch (Exception e) { - if (stderr.size() == 0) { - throw e; - } + sink.setStreams(pinStdout, poutStdin); + sink.start(); - log.error( - "Swallowing exception as preferring stderr", - e - ); - } - finally { - if (stderr.size() > 0) { - throw new RuntimeException( - String.format( - "Unexpected error during execution: %1$s", - new String(stderr.toByteArray(), Charset.forName("UTF-8")) - ) + try { + _client.executeCommand( + command, + new SequenceInputStream(Collections.enumeration(stdinList)), + poutStdout, + stderr ); } + catch (Exception e) { + if (stderr.size() == 0) { + throw e; + } + + log.error( + "Swallowing exception as preferring stderr", + e + ); + } + finally { + if (stderr.size() > 0) { + throw new RuntimeException( + String.format( + "Unexpected error during execution: %1$s", + new String(stderr.toByteArray(), Charset.forName("UTF-8")) + ) + ); + } + } } - } - catch (Exception e) { - log.error( - String.format( - "SSH error running command %1$s:'%2$s'", - _client.getDisplayHost(), - command - ), - e - ); - throw e; - } - finally { - sink.stop(); - sink.setStreams(null, null); + catch (Exception e) { + log.error( + String.format( + "SSH error running command %1$s:'%2$s'", + _client.getDisplayHost(), + command + ), + e + ); + throw e; + } + finally { + sink.stop(); + sink.setStreams(null, null); + } } log.debug("execute leave"); diff --git a/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/ssh/SSHDialogTest.java b/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/ssh/SSHDialogTest.java index 596cd5a..3e85e7e 100644 --- a/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/ssh/SSHDialogTest.java +++ b/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/ssh/SSHDialogTest.java @@ -422,4 +422,112 @@ ); sink.exception(); } + + private static class ReaderSink + implements Runnable, SSHDialog.Sink { + + private SSHDialog.Control _control; + private BufferedReader _incoming; + private PrintWriter _outgoing; + private Throwable _throwable; + private Thread _thread; + private int _delay; + private String _last; + + public ReaderSink(int delay) { + _thread = new Thread(this); + _delay = delay; + } + + public String getLast() { + return _last; + } + + public void exception() throws Throwable { + if (_throwable != null) { + throw _throwable; + } + } + + @Override + public void setControl(SSHDialog.Control control) { + _control = control; + } + + @Override + public void setStreams(InputStream incoming, OutputStream outgoing) { + _incoming = incoming == null ? null : new BufferedReader( + new InputStreamReader( + incoming, + Charset.forName("UTF-8") + ), + BUFFER_SIZE + ); + _outgoing = outgoing == null ? null : new PrintWriter( + new OutputStreamWriter( + outgoing, + Charset.forName("UTF-8") + ), + true + ); + } + + @Override + public void start() { + _thread.start(); + } + + @Override + public void stop() { + if (_thread != null) { + while(true) { + try { + _thread.join(); + break; + } + catch (InterruptedException e) {} + } + _thread = null; + } + } + + public void run() { + try { + String l; + while ((l = _incoming.readLine()) != null) { + _last = l; + Thread.sleep(_delay); + } + } + catch (Throwable t) { + if (_throwable == null) { + _throwable = t; + } + } + finally { + try { + _control.close(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + + @Test + public void testDelay() throws Throwable { + ReaderSink sink = new ReaderSink(10); + _sshdialog.setSoftTimeout(60*1000); + _sshdialog.setHardTimeout(60*1000); + _sshdialog.connect(); + _sshdialog.authenticate(); + _sshdialog.executeCommand( + sink, + "x=0;while [ $x -lt 100 ]; do echo line$x; x=$(($x+1)); done", + null + ); + sink.exception(); + assertEquals("line99", sink.getLast()); + } } -- To view, visit http://gerrit.ovirt.org/26587 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I180a9883eefac57c9b528ff7b6df0eda11149255 Gerrit-PatchSet: 1 Gerrit-Project: ovirt-engine Gerrit-Branch: ovirt-engine-3.4 Gerrit-Owner: Alon Bar-Lev <[email protected]> _______________________________________________ Engine-patches mailing list [email protected] http://lists.ovirt.org/mailman/listinfo/engine-patches
