IGNITE-1477: Fixed.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c0e1ac18 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c0e1ac18 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c0e1ac18 Branch: refs/heads/master Commit: c0e1ac1842df19a4de83dbbcc99090b43371c913 Parents: c065512 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Tue Sep 15 10:25:09 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Tue Sep 15 10:25:09 2015 +0300 ---------------------------------------------------------------------- .../hadoop/igfs/HadoopIgfsWrapper.java | 94 +++++++++++--------- 1 file changed, 53 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c0e1ac18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java index abbb142..01189f7 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse; import org.apache.ignite.internal.processors.igfs.IgfsStatus; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; @@ -339,7 +340,9 @@ public class HadoopIgfsWrapper implements HadoopIgfs { * @return Delegate. */ private Delegate delegate() throws HadoopIgfsCommunicationException { - Exception err = null; + // These fields will contain possible exceptions from shmem and TCP endpoints. + Exception errShmem = null; + Exception errTcp = null; // 1. If delegate is set, return it immediately. Delegate curDelegate = delegateRef.get(); @@ -357,8 +360,8 @@ public class HadoopIgfsWrapper implements HadoopIgfs { igfs = (IgfsEx)ignite.fileSystem(endpoint.igfs()); } - catch (Exception e) { - err = e; + catch (Exception ignore) { + // No-op. } } else { @@ -368,8 +371,8 @@ public class HadoopIgfsWrapper implements HadoopIgfs { break; } - catch (Exception e) { - err = e; + catch (Exception ignore) { + // No-op. } } } @@ -388,57 +391,54 @@ public class HadoopIgfsWrapper implements HadoopIgfs { hadoop.close(true); if (log.isDebugEnabled()) - log.debug("Failed to connect to in-proc IGFS, fallback to IPC mode.", e); - - err = e; + log.debug("Failed to connect to in-process IGFS, fallback to IPC mode.", e); } } } // 3. Try connecting using shmem. - if (!parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, false)) { - if (curDelegate == null && !U.isWindows()) { - HadoopIgfsEx hadoop = null; + boolean skipLocShmem = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, false); - try { - hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log, userName); + if (curDelegate == null && !skipLocShmem && !U.isWindows()) { + HadoopIgfsEx hadoop = null; - curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); - } - catch (IOException | IgniteCheckedException e) { - if (e instanceof HadoopIgfsCommunicationException) - hadoop.close(true); + try { + hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log, userName); - if (log.isDebugEnabled()) - log.debug("Failed to connect to out-proc local IGFS using shmem.", e); + curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); + } + catch (IOException | IgniteCheckedException e) { + if (e instanceof HadoopIgfsCommunicationException) + hadoop.close(true); - err = e; - } + if (log.isDebugEnabled()) + log.debug("Failed to connect to IGFS using shared memory [port=" + endpoint.port() + ']', e); + + errShmem = e; } } // 4. Try local TCP connection. boolean skipLocTcp = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP, authority, false); - if (!skipLocTcp) { - if (curDelegate == null) { - HadoopIgfsEx hadoop = null; + if (curDelegate == null && !skipLocTcp) { + HadoopIgfsEx hadoop = null; - try { - hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(), - log, userName); + try { + hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(), + log, userName); - curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); - } - catch (IOException | IgniteCheckedException e) { - if (e instanceof HadoopIgfsCommunicationException) - hadoop.close(true); + curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); + } + catch (IOException | IgniteCheckedException e) { + if (e instanceof HadoopIgfsCommunicationException) + hadoop.close(true); - if (log.isDebugEnabled()) - log.debug("Failed to connect to out-proc local IGFS using TCP.", e); + if (log.isDebugEnabled()) + log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host() + + ", port=" + endpoint.port() + ']', e); - err = e; - } + errTcp = e; } } @@ -457,9 +457,10 @@ public class HadoopIgfsWrapper implements HadoopIgfs { hadoop.close(true); if (log.isDebugEnabled()) - log.debug("Failed to connect to out-proc remote IGFS using TCP.", e); + log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host() + + ", port=" + endpoint.port() + ']', e); - err = e; + errTcp = e; } } @@ -469,8 +470,19 @@ public class HadoopIgfsWrapper implements HadoopIgfs { return curDelegate; } - else - throw new HadoopIgfsCommunicationException("Failed to connect to IGFS: " + endpoint, err); + else { + SB errMsg = new SB("Failed to connect to IGFS [endpoint=" + authority + ", attempts=["); + + if (errShmem != null) + errMsg.a("[type=SHMEM, port=" + endpoint.port() + ", err=" + errShmem + "], "); + + errMsg.a("[type=TCP, host=" + endpoint.host() + ", port=" + endpoint.port() + ", err=" + errTcp + "]] "); + + errMsg.a("(ensure that IGFS is running and have IPC endpoint enabled; ensure that " + + "ignite-shmem-1.0.0.jar is in Hadoop classpath if you use shared memory endpoint)."); + + throw new HadoopIgfsCommunicationException(errMsg.toString()); + } } /**