Alon Bar-Lev has uploaded a new change for review.

Change subject: host-deploy: SSHDialog: close streams after sink is stopped
......................................................................

host-deploy: SSHDialog: close streams after sink is stopped

this solves Pipe Closed exception happens because of try-with-resources
that was introduced at main thread, this closed the input stream before
the reader thread could terminate properly.

this also reverts 4d544a99, as it happens from same root cause, and I
read the java sources incorrectly, the buffered input stream uses
byte-by-byte read and not buffered.

add unit test to reproduce this issue.

Bug-Url: https://bugzilla.redhat.com/show_bug.cgi?id=1084911
Change-Id: I180a9883eefac57c9b528ff7b6df0eda11149255
Signed-off-by: Alon Bar-Lev <[email protected]>
---
M 
backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/OVirtNodeUpgrade.java
M 
backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/ssh/SSHDialog.java
M 
backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/ssh/SSHDialogTest.java
3 files changed, 167 insertions(+), 75 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/ovirt-engine refs/changes/87/26587/1

diff --git 
a/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/OVirtNodeUpgrade.java
 
b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/OVirtNodeUpgrade.java
index 4b05c34..e6b4d2d 100644
--- 
a/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/OVirtNodeUpgrade.java
+++ 
b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/OVirtNodeUpgrade.java
@@ -28,7 +28,6 @@
 
     private static final int BUFFER_SIZE = 10 * 1024;
     private static final int THREAD_JOIN_TIMEOUT = 20 * 1000;
-    private static final String ARTIFICIAL_EOF = "component='RHEV_INSTALL'";
 
     private static final Log log = LogFactory.getLog(OVirtNodeUpgrade.class);
 
@@ -53,29 +52,12 @@
         boolean error = false;
         try {
             String line;
-            boolean eof = false;
             while (
-                !eof &&
                 _incoming != null &&
                 (line = _incoming.readLine()) != null
             ) {
                 log.infoFormat("update from host {0}: {1}", 
_vds.getHostName(), line);
                 error = _messages.postOldXmlFormat(line) || error;
-
-                /*
-                 * apply artificial end-of-file
-                 *
-                 * there is no way in java to use non blocking
-                 * InputStream.
-                 *
-                 * there is no way to interrupt the blocking.
-                 *
-                 * and we cannot terminate thread as the connection
-                 * pool will get messy.
-                 *
-                 * so we should detect eof based on data.
-                 */
-                eof = line.contains(ARTIFICIAL_EOF);
             }
 
             if (error) {
diff --git 
a/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/ssh/SSHDialog.java
 
b/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/ssh/SSHDialog.java
index 98959f2..4abab39 100644
--- 
a/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/ssh/SSHDialog.java
+++ 
b/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/ssh/SSHDialog.java
@@ -291,71 +291,73 @@
             final OutputStream poutStdout = new PipedOutputStream(pinStdout);
             final ByteArrayOutputStream stderr = new 
ConstraintByteArrayOutputStream(1024);
         ) {
-            List<InputStream> stdinList;
-            if (initial == null) {
-                stdinList = new LinkedList<InputStream>();
-            }
-            else {
-                stdinList = new 
LinkedList<InputStream>(Arrays.asList(initial));
-            }
-            stdinList.add(pinStdin);
+            try {
+                List<InputStream> stdinList;
+                if (initial == null) {
+                    stdinList = new LinkedList<InputStream>();
+                }
+                else {
+                    stdinList = new 
LinkedList<InputStream>(Arrays.asList(initial));
+                }
+                stdinList.add(pinStdin);
 
-            sink.setControl(
-                new Control() {
-                    @Override
-                    public void close() throws IOException {
-                        if (_client != null) {
-                            _client.close();
+                sink.setControl(
+                    new Control() {
+                        @Override
+                        public void close() throws IOException {
+                            if (_client != null) {
+                                _client.close();
+                            }
                         }
                     }
-                }
-            );
-            sink.setStreams(pinStdout, poutStdin);
-            sink.start();
-
-            try {
-                _client.executeCommand(
-                    command,
-                    new 
SequenceInputStream(Collections.enumeration(stdinList)),
-                    poutStdout,
-                    stderr
                 );
-            }
-            catch (Exception e) {
-                if (stderr.size() == 0) {
-                    throw e;
-                }
+                sink.setStreams(pinStdout, poutStdin);
+                sink.start();
 
-                log.error(
-                    "Swallowing exception as preferring stderr",
-                    e
-                );
-            }
-            finally {
-                if (stderr.size() > 0) {
-                    throw new RuntimeException(
-                        String.format(
-                            "Unexpected error during execution: %1$s",
-                            new String(stderr.toByteArray(), 
Charset.forName("UTF-8"))
-                        )
+                try {
+                    _client.executeCommand(
+                        command,
+                        new 
SequenceInputStream(Collections.enumeration(stdinList)),
+                        poutStdout,
+                        stderr
                     );
                 }
+                catch (Exception e) {
+                    if (stderr.size() == 0) {
+                        throw e;
+                    }
+
+                    log.error(
+                        "Swallowing exception as preferring stderr",
+                        e
+                    );
+                }
+                finally {
+                    if (stderr.size() > 0) {
+                        throw new RuntimeException(
+                            String.format(
+                                "Unexpected error during execution: %1$s",
+                                new String(stderr.toByteArray(), 
Charset.forName("UTF-8"))
+                            )
+                        );
+                    }
+                }
             }
-        }
-        catch (Exception e) {
-            log.error(
-                String.format(
-                    "SSH error running command %1$s:'%2$s'",
-                    _client.getDisplayHost(),
-                    command
-                ),
-                e
-            );
-            throw e;
-        }
-        finally {
-            sink.stop();
-            sink.setStreams(null, null);
+            catch (Exception e) {
+                log.error(
+                    String.format(
+                        "SSH error running command %1$s:'%2$s'",
+                        _client.getDisplayHost(),
+                        command
+                    ),
+                    e
+                );
+                throw e;
+            }
+            finally {
+                sink.stop();
+                sink.setStreams(null, null);
+            }
         }
 
         log.debug("execute leave");
diff --git 
a/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/ssh/SSHDialogTest.java
 
b/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/ssh/SSHDialogTest.java
index 596cd5a..3e85e7e 100644
--- 
a/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/ssh/SSHDialogTest.java
+++ 
b/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/ssh/SSHDialogTest.java
@@ -422,4 +422,112 @@
         );
         sink.exception();
     }
+
+    private static class ReaderSink
+    implements Runnable, SSHDialog.Sink {
+
+        private SSHDialog.Control _control;
+        private BufferedReader _incoming;
+        private PrintWriter _outgoing;
+        private Throwable _throwable;
+        private Thread _thread;
+        private int _delay;
+        private String _last;
+
+        public ReaderSink(int delay) {
+            _thread = new Thread(this);
+            _delay = delay;
+        }
+
+        public String getLast() {
+            return _last;
+        }
+
+        public void exception() throws Throwable {
+            if (_throwable != null) {
+                throw _throwable;
+            }
+        }
+
+        @Override
+        public void setControl(SSHDialog.Control control) {
+            _control = control;
+        }
+
+        @Override
+        public void setStreams(InputStream incoming, OutputStream outgoing) {
+            _incoming = incoming == null ? null : new BufferedReader(
+                new InputStreamReader(
+                    incoming,
+                    Charset.forName("UTF-8")
+                ),
+                BUFFER_SIZE
+            );
+            _outgoing = outgoing == null ? null : new PrintWriter(
+                new OutputStreamWriter(
+                    outgoing,
+                    Charset.forName("UTF-8")
+                ),
+                true
+            );
+        }
+
+        @Override
+        public void start() {
+            _thread.start();
+        }
+
+        @Override
+        public void stop() {
+            if (_thread != null) {
+                while(true) {
+                    try {
+                        _thread.join();
+                        break;
+                    }
+                    catch (InterruptedException e) {}
+                }
+                _thread = null;
+            }
+        }
+
+        public void run()  {
+            try {
+                String l;
+                while ((l = _incoming.readLine()) != null) {
+                    _last = l;
+                    Thread.sleep(_delay);
+                }
+            }
+            catch (Throwable t) {
+                if (_throwable == null) {
+                    _throwable = t;
+                }
+            }
+            finally {
+                try {
+                    _control.close();
+                }
+                catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testDelay() throws Throwable {
+        ReaderSink sink = new ReaderSink(10);
+        _sshdialog.setSoftTimeout(60*1000);
+        _sshdialog.setHardTimeout(60*1000);
+        _sshdialog.connect();
+        _sshdialog.authenticate();
+        _sshdialog.executeCommand(
+            sink,
+            "x=0;while [ $x -lt 100 ]; do echo line$x; x=$(($x+1)); done",
+            null
+        );
+        sink.exception();
+        assertEquals("line99", sink.getLast());
+    }
 }


-- 
To view, visit http://gerrit.ovirt.org/26587
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I180a9883eefac57c9b528ff7b6df0eda11149255
Gerrit-PatchSet: 1
Gerrit-Project: ovirt-engine
Gerrit-Branch: ovirt-engine-3.4
Gerrit-Owner: Alon Bar-Lev <[email protected]>
_______________________________________________
Engine-patches mailing list
[email protected]
http://lists.ovirt.org/mailman/listinfo/engine-patches

Reply via email to