http://git-wip-us.apache.org/repos/asf/ignite/blob/feba9534/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java index 82ad683..c9d4cbb 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java @@ -17,6 +17,21 @@ package org.apache.ignite.hadoop.fs.v2; +import java.io.BufferedOutputStream; +import java.io.Closeable; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -27,7 +42,6 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.InvalidPathException; @@ -36,57 +50,27 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.configuration.FileSystemConfiguration; -import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; -import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsFile; -import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.internal.igfs.common.IgfsLogger; -import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils; -import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint; import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsInputStream; import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsOutputStream; -import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsProxyInputStream; -import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsProxyOutputStream; import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsStreamDelegate; import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsWrapper; import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse; -import org.apache.ignite.internal.processors.igfs.IgfsModeResolver; -import org.apache.ignite.internal.processors.igfs.IgfsPaths; import org.apache.ignite.internal.processors.igfs.IgfsStatus; import org.apache.ignite.internal.processors.igfs.IgfsUtils; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; -import java.io.BufferedOutputStream; -import java.io.Closeable; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.OutputStream; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE; import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_DIR; import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.getFsHadoopUser; -import static org.apache.ignite.igfs.IgfsMode.PROXY; import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_COLOCATED_WRITES; import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_LOG_BATCH_SIZE; import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_LOG_DIR; @@ -163,15 +147,6 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea /** Default replication factor. */ private short dfltReplication; - /** Secondary URI string. */ - private URI secondaryUri; - - /** Mode resolver. */ - private IgfsModeResolver modeRslvr; - - /** The secondary file system factory. */ - private HadoopFileSystemFactoryDelegate factory; - /** Whether custom sequential reads before prefetch value is provided. */ private boolean seqReadsBeforePrefetchOverride; @@ -303,8 +278,6 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea grpBlockSize = handshake.blockSize(); - IgfsPaths paths = handshake.secondaryPaths(); - Boolean logEnabled = parameter(cfg, PARAM_IGFS_LOG_ENABLED, uriAuthority, false); if (handshake.sampling() != null ? handshake.sampling() : logEnabled) { @@ -318,59 +291,6 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea } else clientLog = IgfsLogger.disabledLogger(); - - try { - modeRslvr = new IgfsModeResolver(paths.defaultMode(), paths.pathModes()); - } - catch (IgniteCheckedException ice) { - throw new IOException(ice); - } - - boolean initSecondary = paths.defaultMode() == PROXY; - - if (!initSecondary && paths.pathModes() != null) { - for (T2<IgfsPath, IgfsMode> pathMode : paths.pathModes()) { - IgfsMode mode = pathMode.getValue(); - - if (mode == PROXY) { - initSecondary = true; - - break; - } - } - } - - if (initSecondary) { - try { - HadoopFileSystemFactory factory0 = - (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader()); - - factory = HadoopDelegateUtils.fileSystemFactoryDelegate(getClass().getClassLoader(), factory0); - } - catch (IgniteCheckedException e) { - throw new IOException("Failed to get secondary file system factory.", e); - } - - if (factory == null) - throw new IOException("Failed to get secondary file system factory (did you set " + - IgniteHadoopIgfsSecondaryFileSystem.class.getName() + " as \"secondaryFIleSystem\" in " + - FileSystemConfiguration.class.getName() + "?)"); - - assert factory != null; - - factory.start(); - - try { - FileSystem secFs = (FileSystem)factory.get(user); - - secondaryUri = secFs.getUri(); - - A.ensure(secondaryUri != null, "Secondary file system uri should not be null."); - } - catch (IOException e) { - throw new IOException("Failed to connect to the secondary file system: " + secondaryUri, e); - } - } } finally { leaveBusy(); @@ -388,9 +308,6 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea if (clientLog.isLogEnabled()) clientLog.close(); - if (factory != null) - factory.stop(); - // Reset initialized resources. rmtClient = null; } @@ -414,19 +331,15 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea /** {@inheritDoc} */ @Override public boolean setReplication(Path f, short replication) throws IOException { - return mode(f) == PROXY && secondaryFileSystem().setReplication(f, replication); + return false; } /** {@inheritDoc} */ @Override public void setTimes(Path f, long mtime, long atime) throws IOException { - if (mode(f) == PROXY) - secondaryFileSystem().setTimes(f, mtime, atime); - else { - if (mtime == -1 && atime == -1) - return; + if (mtime == -1 && atime == -1) + return; - rmtClient.setTimes(convert(f), atime, mtime); - } + rmtClient.setTimes(convert(f), atime, mtime); } /** {@inheritDoc} */ @@ -443,13 +356,9 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea try { A.notNull(p, "p"); - if (mode(p) == PROXY) - secondaryFileSystem().setPermission(toSecondary(p), perm); - else { - if (rmtClient.update(convert(p), permission(perm)) == null) - throw new IOException("Failed to set file permission (file not found?)" + - " [path=" + p + ", perm=" + perm + ']'); - } + if (rmtClient.update(convert(p), permission(perm)) == null) + throw new IOException("Failed to set file permission (file not found?)" + + " [path=" + p + ", perm=" + perm + ']'); } finally { leaveBusy(); @@ -465,9 +374,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea enterBusy(); try { - if (mode(p) == PROXY) - secondaryFileSystem().setOwner(toSecondary(p), usr, grp); - else if (rmtClient.update(convert(p), F.asMap(IgfsUtils.PROP_USER_NAME, usr, + if (rmtClient.update(convert(p), F.asMap(IgfsUtils.PROP_USER_NAME, usr, IgfsUtils.PROP_GROUP_NAME, grp)) == null) { throw new IOException("Failed to set file permission (file not found?)" + " [path=" + p + ", username=" + usr + ", grpName=" + grp + ']'); @@ -486,50 +393,29 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea try { IgfsPath path = convert(f); - IgfsMode mode = modeRslvr.resolveMode(path); - - if (mode == PROXY) { - FSDataInputStream is = secondaryFileSystem().open(toSecondary(f), bufSize); - - if (clientLog.isLogEnabled()) { - // At this point we do not know file size, so we perform additional request to remote FS to get it. - FileStatus status = secondaryFileSystem().getFileStatus(toSecondary(f)); - long size = status != null ? status.getLen() : -1; + HadoopIgfsStreamDelegate stream = seqReadsBeforePrefetchOverride ? + rmtClient.open(path, seqReadsBeforePrefetch) : rmtClient.open(path); - long logId = IgfsLogger.nextId(); + long logId = -1; - clientLog.logOpen(logId, path, PROXY, bufSize, size); + if (clientLog.isLogEnabled()) { + logId = IgfsLogger.nextId(); - return new FSDataInputStream(new HadoopIgfsProxyInputStream(is, clientLog, logId)); - } - else - return is; + clientLog.logOpen(logId, path, bufSize, stream.length()); } - else { - HadoopIgfsStreamDelegate stream = seqReadsBeforePrefetchOverride ? - rmtClient.open(path, seqReadsBeforePrefetch) : rmtClient.open(path); - - long logId = -1; - - if (clientLog.isLogEnabled()) { - logId = IgfsLogger.nextId(); - - clientLog.logOpen(logId, path, mode, bufSize, stream.length()); - } - if (LOG.isDebugEnabled()) - LOG.debug("Opening input stream [thread=" + Thread.currentThread().getName() + ", path=" + path + - ", bufSize=" + bufSize + ']'); + if (LOG.isDebugEnabled()) + LOG.debug("Opening input stream [thread=" + Thread.currentThread().getName() + ", path=" + path + + ", bufSize=" + bufSize + ']'); - HadoopIgfsInputStream igfsIn = new HadoopIgfsInputStream(stream, stream.length(), - bufSize, LOG, clientLog, logId); + HadoopIgfsInputStream igfsIn = new HadoopIgfsInputStream(stream, stream.length(), + bufSize, LOG, clientLog, logId); - if (LOG.isDebugEnabled()) - LOG.debug("Opened input stream [path=" + path + ", delegate=" + stream + ']'); + if (LOG.isDebugEnabled()) + LOG.debug("Opened input stream [path=" + path + ", delegate=" + stream + ']'); - return new FSDataInputStream(igfsIn); - } + return new FSDataInputStream(igfsIn); } finally { leaveBusy(); @@ -561,80 +447,60 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea try { IgfsPath path = convert(f); - IgfsMode mode = modeRslvr.resolveMode(path); if (LOG.isDebugEnabled()) LOG.debug("Opening output stream in create [thread=" + Thread.currentThread().getName() + "path=" + path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']'); - if (mode == PROXY) { - FSDataOutputStream os = secondaryFileSystem().create(toSecondary(f), perm, flag, bufSize, - replication, blockSize, progress); - - if (clientLog.isLogEnabled()) { - long logId = IgfsLogger.nextId(); - - if (append) - clientLog.logAppend(logId, path, PROXY, bufSize); // Don't have stream ID. - else - clientLog.logCreate(logId, path, PROXY, overwrite, bufSize, replication, blockSize); - - return new FSDataOutputStream(new HadoopIgfsProxyOutputStream(os, clientLog, logId)); - } - else - return os; - } - else { - Map<String, String> permMap = F.asMap(IgfsUtils.PROP_PERMISSION, toString(perm), - IgfsUtils.PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites)); - - // Create stream and close it in the 'finally' section if any sequential operation failed. - HadoopIgfsStreamDelegate stream; + Map<String, String> permMap = F.asMap(IgfsUtils.PROP_PERMISSION, toString(perm), + IgfsUtils.PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites)); - long logId = -1; + // Create stream and close it in the 'finally' section if any sequential operation failed. + HadoopIgfsStreamDelegate stream; - if (append) { - stream = rmtClient.append(path, create, permMap); + long logId = -1; - if (clientLog.isLogEnabled()) { - logId = IgfsLogger.nextId(); + if (append) { + stream = rmtClient.append(path, create, permMap); - clientLog.logAppend(logId, path, mode, bufSize); - } + if (clientLog.isLogEnabled()) { + logId = IgfsLogger.nextId(); - if (LOG.isDebugEnabled()) - LOG.debug("Opened output stream in append [path=" + path + ", delegate=" + stream + ']'); + clientLog.logAppend(logId, path, bufSize); } - else { - stream = rmtClient.create(path, overwrite, colocateFileWrites, replication, blockSize, - permMap); - if (clientLog.isLogEnabled()) { - logId = IgfsLogger.nextId(); + if (LOG.isDebugEnabled()) + LOG.debug("Opened output stream in append [path=" + path + ", delegate=" + stream + ']'); + } + else { + stream = rmtClient.create(path, overwrite, colocateFileWrites, replication, blockSize, + permMap); - clientLog.logCreate(logId, path, mode, overwrite, bufSize, replication, blockSize); - } + if (clientLog.isLogEnabled()) { + logId = IgfsLogger.nextId(); - if (LOG.isDebugEnabled()) - LOG.debug("Opened output stream in create [path=" + path + ", delegate=" + stream + ']'); + clientLog.logCreate(logId, path, overwrite, bufSize, replication, blockSize); } - assert stream != null; + if (LOG.isDebugEnabled()) + LOG.debug("Opened output stream in create [path=" + path + ", delegate=" + stream + ']'); + } + + assert stream != null; - HadoopIgfsOutputStream igfsOut = new HadoopIgfsOutputStream(stream, LOG, - clientLog, logId); + HadoopIgfsOutputStream igfsOut = new HadoopIgfsOutputStream(stream, LOG, + clientLog, logId); - bufSize = Math.max(64 * 1024, bufSize); + bufSize = Math.max(64 * 1024, bufSize); - out = new BufferedOutputStream(igfsOut, bufSize); + out = new BufferedOutputStream(igfsOut, bufSize); - FSDataOutputStream res = new FSDataOutputStream(out, null, 0); + FSDataOutputStream res = new FSDataOutputStream(out, null, 0); - // Mark stream created successfully. - out = null; + // Mark stream created successfully. + out = null; - return res; - } + return res; } finally { // Close if failed during stream creation. @@ -661,15 +527,10 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea IgfsPath srcPath = convert(src); IgfsPath dstPath = convert(dst); - IgfsMode srcMode = modeRslvr.resolveMode(srcPath); - if (clientLog.isLogEnabled()) - clientLog.logRename(srcPath, srcMode, dstPath); + clientLog.logRename(srcPath, dstPath); - if (srcMode == PROXY) - secondaryFileSystem().rename(toSecondary(src), toSecondary(dst)); - else - rmtClient.rename(srcPath, dstPath); + rmtClient.rename(srcPath, dstPath); } finally { leaveBusy(); @@ -685,19 +546,10 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea try { IgfsPath path = convert(f); - IgfsMode mode = modeRslvr.resolveMode(path); - - if (mode == PROXY) { - if (clientLog.isLogEnabled()) - clientLog.logDelete(path, PROXY, recursive); - - return secondaryFileSystem().delete(toSecondary(f), recursive); - } - boolean res = rmtClient.delete(path, recursive); if (clientLog.isLogEnabled()) - clientLog.logDelete(path, mode, recursive); + clientLog.logDelete(path, recursive); return res; } @@ -708,16 +560,11 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea /** {@inheritDoc} */ @Override public void setVerifyChecksum(boolean verifyChecksum) throws IOException { - // Checksum has effect for secondary FS only. - if (factory != null) - secondaryFileSystem().setVerifyChecksum(verifyChecksum); + // No-op. } /** {@inheritDoc} */ @Override public FileChecksum getFileChecksum(Path f) throws IOException { - if (mode(f) == PROXY) - return secondaryFileSystem().getFileChecksum(f); - return null; } @@ -729,52 +576,29 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea try { IgfsPath path = convert(f); - IgfsMode mode = modeRslvr.resolveMode(path); - if (mode == PROXY) { - FileStatus[] arr = secondaryFileSystem().listStatus(toSecondary(f)); + Collection<IgfsFile> list = rmtClient.listFiles(path); - if (arr == null) - throw new FileNotFoundException("File " + f + " does not exist."); + if (list == null) + throw new FileNotFoundException("File " + f + " does not exist."); - for (int i = 0; i < arr.length; i++) - arr[i] = toPrimary(arr[i]); + List<IgfsFile> files = new ArrayList<>(list); - if (clientLog.isLogEnabled()) { - String[] fileArr = new String[arr.length]; + FileStatus[] arr = new FileStatus[files.size()]; - for (int i = 0; i < arr.length; i++) - fileArr[i] = arr[i].getPath().toString(); + for (int i = 0; i < arr.length; i++) + arr[i] = convert(files.get(i)); - clientLog.logListDirectory(path, PROXY, fileArr); - } - - return arr; - } - else { - Collection<IgfsFile> list = rmtClient.listFiles(path); - - if (list == null) - throw new FileNotFoundException("File " + f + " does not exist."); - - List<IgfsFile> files = new ArrayList<>(list); - - FileStatus[] arr = new FileStatus[files.size()]; + if (clientLog.isLogEnabled()) { + String[] fileArr = new String[arr.length]; for (int i = 0; i < arr.length; i++) - arr[i] = convert(files.get(i)); - - if (clientLog.isLogEnabled()) { - String[] fileArr = new String[arr.length]; - - for (int i = 0; i < arr.length; i++) - fileArr[i] = arr[i].getPath().toString(); - - clientLog.logListDirectory(path, mode, fileArr); - } + fileArr[i] = arr[i].getPath().toString(); - return arr; + clientLog.logListDirectory(path, fileArr); } + + return arr; } finally { leaveBusy(); @@ -789,20 +613,11 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea try { IgfsPath path = convert(f); - IgfsMode mode = modeRslvr.resolveMode(path); - if (mode == PROXY) { - if (clientLog.isLogEnabled()) - clientLog.logMakeDirectory(path, PROXY); + rmtClient.mkdirs(path, permission(perm)); - secondaryFileSystem().mkdirs(toSecondary(f), perm); - } - else { - rmtClient.mkdirs(path, permission(perm)); - - if (clientLog.isLogEnabled()) - clientLog.logMakeDirectory(path, mode); - } + if (clientLog.isLogEnabled()) + clientLog.logMakeDirectory(path); } finally { leaveBusy(); @@ -816,16 +631,12 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea enterBusy(); try { - if (mode(f) == PROXY) - return toPrimary(secondaryFileSystem().getFileStatus(toSecondary(f))); - else { - IgfsFile info = rmtClient.info(convert(f)); + IgfsFile info = rmtClient.info(convert(f)); - if (info == null) - throw new FileNotFoundException("File not found: " + f); + if (info == null) + throw new FileNotFoundException("File not found: " + f); - return convert(info); - } + return convert(info); } finally { leaveBusy(); @@ -841,25 +652,21 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea enterBusy(); try { - if (modeRslvr.resolveMode(igfsPath) == PROXY) - return secondaryFileSystem().getFileBlockLocations(path, start, len); - else { - long now = System.currentTimeMillis(); + long now = System.currentTimeMillis(); - List<IgfsBlockLocation> affinity = new ArrayList<>( - rmtClient.affinity(igfsPath, start, len)); + List<IgfsBlockLocation> affinity = new ArrayList<>( + rmtClient.affinity(igfsPath, start, len)); - BlockLocation[] arr = new BlockLocation[affinity.size()]; + BlockLocation[] arr = new BlockLocation[affinity.size()]; - for (int i = 0; i < arr.length; i++) - arr[i] = convert(affinity.get(i)); + for (int i = 0; i < arr.length; i++) + arr[i] = convert(affinity.get(i)); - if (LOG.isDebugEnabled()) - LOG.debug("Fetched file locations [path=" + path + ", fetchTime=" + - (System.currentTimeMillis() - now) + ", locations=" + Arrays.asList(arr) + ']'); + if (LOG.isDebugEnabled()) + LOG.debug("Fetched file locations [path=" + path + ", fetchTime=" + + (System.currentTimeMillis() - now) + ", locations=" + Arrays.asList(arr) + ']'); - return arr; - } + return arr; } finally { leaveBusy(); @@ -867,77 +674,6 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea } /** - * Resolve path mode. - * - * @param path HDFS path. - * @return Path mode. - */ - public IgfsMode mode(Path path) { - return modeRslvr.resolveMode(convert(path)); - } - - /** - * Convert the given path to path acceptable by the primary file system. - * - * @param path Path. - * @return Primary file system path. - */ - private Path toPrimary(Path path) { - return convertPath(path, getUri()); - } - - /** - * Convert the given path to path acceptable by the secondary file system. - * - * @param path Path. - * @return Secondary file system path. - */ - private Path toSecondary(Path path) { - assert factory != null; - assert secondaryUri != null; - - return convertPath(path, secondaryUri); - } - - /** - * Convert path using the given new URI. - * - * @param path Old path. - * @param newUri New URI. - * @return New path. - */ - private Path convertPath(Path path, URI newUri) { - assert newUri != null; - - if (path != null) { - URI pathUri = path.toUri(); - - try { - return new Path(new URI(pathUri.getScheme() != null ? newUri.getScheme() : null, - pathUri.getAuthority() != null ? newUri.getAuthority() : null, pathUri.getPath(), null, null)); - } - catch (URISyntaxException e) { - throw new IgniteException("Failed to construct secondary file system path from the primary file " + - "system path: " + path, e); - } - } - else - return null; - } - - /** - * Convert a file status obtained from the secondary file system to a status of the primary file system. - * - * @param status Secondary file system status. - * @return Primary file system status. - */ - private FileStatus toPrimary(FileStatus status) { - return status != null ? new FileStatus(status.getLen(), status.isDirectory(), status.getReplication(), - status.getBlockSize(), status.getModificationTime(), status.getAccessTime(), status.getPermission(), - status.getOwner(), status.getGroup(), toPrimary(status.getPath())) : null; - } - - /** * Convert IGFS path into Hadoop path. * * @param path IGFS path. @@ -1065,15 +801,4 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea public String user() { return user; } - - /** - * Gets cached or creates a {@link FileSystem}. - * - * @return The secondary file system. - */ - private FileSystem secondaryFileSystem() throws IOException{ - assert factory != null; - - return (FileSystem)factory.get(user); - } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/feba9534/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java index fe6492e..9c7febf 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.hadoop.impl.delegate; import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; @@ -125,8 +126,7 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]"); } - //Result is not used in case of secondary FS. - return null; + return info(path); } /** {@inheritDoc} */ @@ -300,7 +300,7 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco final Map<String, String> props = properties(status); - return new IgfsFile() { + return new IgfsFileImpl(new IgfsFile() { @Override public IgfsPath path() { return path; } @@ -353,7 +353,7 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco @Override public Map<String, String> properties() { return props; } - }; + }, 0); } catch (FileNotFoundException ignore) { return null; @@ -400,6 +400,9 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco return blks; } + catch (FileNotFoundException ignored) { + return Collections.emptyList(); + } catch (IOException e) { throw handleSecondaryFsError(e, "Failed affinity for path: " + path); } http://git-wip-us.apache.org/repos/asf/ignite/blob/feba9534/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsEx.java index 2294134..23bfc4f 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsEx.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsEx.java @@ -17,7 +17,9 @@ package org.apache.ignite.internal.processors.hadoop.impl.igfs; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.igfs.IgfsModeResolver; import org.jetbrains.annotations.Nullable; import java.io.IOException; @@ -91,4 +93,10 @@ public interface HadoopIgfsEx extends HadoopIgfs { * @return the user name. */ public String user(); + + /** + * @return Mode resolver. + * @throws IgniteCheckedException On error. + */ + IgfsModeResolver modeResolver() throws IgniteCheckedException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/feba9534/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInProc.java index 0ca2e56..0577c73 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInProc.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInProc.java @@ -42,6 +42,8 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.processors.igfs.IgfsEx; import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse; +import org.apache.ignite.internal.processors.igfs.IgfsImpl; +import org.apache.ignite.internal.processors.igfs.IgfsModeResolver; import org.apache.ignite.internal.processors.igfs.IgfsStatus; import org.apache.ignite.internal.processors.igfs.IgfsUtils; import org.apache.ignite.internal.processors.resource.GridSpringResourceContext; @@ -210,10 +212,9 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { @Override public IgfsHandshakeResponse apply() { igfs.clientLogDirectory(logDir); - return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(), - igfs.globalSampling()); - } - }); + return new IgfsHandshakeResponse(igfs.name(), igfs.groupBlockSize(), igfs.globalSampling()); + } + }); } /** @@ -660,4 +661,18 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { @Override public String user() { return user; } + + /** {@inheritDoc} */ + @Override public IgfsModeResolver modeResolver() throws IgniteCheckedException { + try { + return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsModeResolver>() { + @Override public IgfsModeResolver apply() { + return ((IgfsImpl)igfs).modeResolver(); + } + }); + } + catch (IllegalStateException ignored) { + throw new HadoopIgfsCommunicationException("Failed to get mode resolver because Grid is stopping"); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/feba9534/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsOutProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsOutProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsOutProc.java index 88f26f1..2780966 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsOutProc.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsOutProc.java @@ -28,11 +28,13 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.igfs.common.IgfsControlResponse; import org.apache.ignite.internal.igfs.common.IgfsHandshakeRequest; import org.apache.ignite.internal.igfs.common.IgfsMessage; +import org.apache.ignite.internal.igfs.common.IgfsModeResolverRequest; import org.apache.ignite.internal.igfs.common.IgfsPathControlRequest; import org.apache.ignite.internal.igfs.common.IgfsStatusRequest; import org.apache.ignite.internal.igfs.common.IgfsStreamControlRequest; import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse; import org.apache.ignite.internal.processors.igfs.IgfsInputStreamDescriptor; +import org.apache.ignite.internal.processors.igfs.IgfsModeResolver; import org.apache.ignite.internal.processors.igfs.IgfsStatus; import org.apache.ignite.internal.processors.igfs.IgfsUtils; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -52,6 +54,7 @@ import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.INFO; import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.LIST_FILES; import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.LIST_PATHS; import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.MAKE_DIRECTORIES; +import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.MODE_RESOLVER; import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.OPEN_APPEND; import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.OPEN_CREATE; import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.OPEN_READ; @@ -103,6 +106,10 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, Collection<IgfsBlockLocation>> BLOCK_LOCATION_COL_RES = createClosure(); + /** Expected result is {@code IgfsFile}. */ + private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, + IgfsModeResolver> MODE_RESOLVER_RES = createClosure(); + /** IGFS name. */ private final String igfs; @@ -518,4 +525,9 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener @Override public String user() { return userName; } + + /** {@inheritDoc} */ + @Override public IgfsModeResolver modeResolver() throws IgniteCheckedException { + return io.send(new IgfsModeResolverRequest()).chain(MODE_RESOLVER_RES).get(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/feba9534/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsWrapper.java index 6fa5d60..bee7dc2 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsWrapper.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsWrapper.java @@ -32,6 +32,7 @@ import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.igfs.IgfsPathSummary; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint; import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse; +import org.apache.ignite.internal.processors.igfs.IgfsModeResolver; import org.apache.ignite.internal.processors.igfs.IgfsStatus; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; @@ -272,6 +273,19 @@ public class HadoopIgfsWrapper implements HadoopIgfs { } /** + * @return Mode resolver. + * @throws IOException On error. + */ + public IgfsModeResolver modeResolver() throws IOException{ + return withReconnectHandling(new FileSystemClosure<IgfsModeResolver>() { + @Override public IgfsModeResolver apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.modeResolver(); + } + }); + } + + /** * Execute closure which is not path-specific. * * @param clo Closure. http://git-wip-us.apache.org/repos/asf/ignite/blob/feba9534/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopFIleSystemFactorySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopFIleSystemFactorySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopFIleSystemFactorySelfTest.java index 6b5690c..1793a05 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopFIleSystemFactorySelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopFIleSystemFactorySelfTest.java @@ -127,9 +127,8 @@ public class HadoopFIleSystemFactorySelfTest extends IgfsCommonAbstractTest { // Create remote instance. FileSystem fs = FileSystem.get(URI.create("igfs://primary@127.0.0.1:10500/"), baseConfiguration()); - // Ensure lifecycle callback was invoked. - assert START_CNT.get() == 2; - assert STOP_CNT.get() == 0; + assertEquals(1, START_CNT.get()); + assertEquals(0, STOP_CNT.get()); // Check file system operations. assert fs.exists(PATH_DUAL); @@ -148,17 +147,16 @@ public class HadoopFIleSystemFactorySelfTest extends IgfsCommonAbstractTest { assert secondary.exists(IGFS_PATH_PROXY); assert fs.exists(PATH_PROXY); - // Close file system and ensure that associated factory was notified. fs.close(); - assert START_CNT.get() == 2; - assert STOP_CNT.get() == 1; + assertEquals(1, START_CNT.get()); + assertEquals(0, STOP_CNT.get()); // Stop primary node and ensure that base factory was notified. G.stop(primary.context().kernalContext().grid().name(), true); - assert START_CNT.get() == 2; - assert STOP_CNT.get() == 2; + assertEquals(1, START_CNT.get()); + assertEquals(1, STOP_CNT.get()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/feba9534/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemAbstractSelfTest.java index f70838a..2214b5b 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemAbstractSelfTest.java @@ -71,6 +71,7 @@ import org.apache.ignite.igfs.IgfsIpcEndpointType; import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest; +import org.apache.ignite.internal.processors.igfs.IgfsModeResolver; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; @@ -2108,6 +2109,15 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA } /** + * @throws Exception If failed. + */ + public void testModeResolver() throws Exception { + IgfsModeResolver mr = ((IgniteHadoopFileSystem)fs).getModeResolver(); + + assertEquals(mode, mr.resolveMode(IgfsPath.ROOT)); + } + + /** * Verifies that client reconnects after connection to the server has been lost (multithreaded mode). * * @throws Exception If error occurs. http://git-wip-us.apache.org/repos/asf/ignite/blob/feba9534/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemLoggerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemLoggerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemLoggerSelfTest.java index b61492a..6de033f 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemLoggerSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemLoggerSelfTest.java @@ -31,7 +31,6 @@ import java.io.InputStreamReader; import java.util.ArrayList; import java.util.List; -import static org.apache.ignite.igfs.IgfsMode.PRIMARY; import static org.apache.ignite.internal.igfs.common.IgfsLogger.DELIM_FIELD; import static org.apache.ignite.internal.igfs.common.IgfsLogger.DELIM_FIELD_VAL; import static org.apache.ignite.internal.igfs.common.IgfsLogger.HDR; @@ -123,7 +122,7 @@ public class IgniteHadoopFileSystemLoggerSelfTest extends IgfsCommonAbstractTest otherLog.close(); - log.logDelete(PATH, PRIMARY, false); + log.logDelete(PATH, false); log.close(); @@ -166,7 +165,7 @@ public class IgniteHadoopFileSystemLoggerSelfTest extends IgfsCommonAbstractTest public void testLogRead() throws Exception { IgfsLogger log = IgfsLogger.logger(ENDPOINT, IGFS_NAME, LOG_DIR, 10); - log.logOpen(1, PATH, PRIMARY, 2, 3L); + log.logOpen(1, PATH, 2, 3L); log.logRandomRead(1, 4L, 5); log.logSeek(1, 6L); log.logSkip(1, 7L); @@ -177,7 +176,7 @@ public class IgniteHadoopFileSystemLoggerSelfTest extends IgfsCommonAbstractTest log.close(); checkLog( - new SB().a(U.jvmPid() + d() + TYPE_OPEN_IN + d() + PATH_STR_ESCAPED + d() + PRIMARY + d() + 1 + d() + 2 + + new SB().a(U.jvmPid() + d() + TYPE_OPEN_IN + d() + PATH_STR_ESCAPED + d() + d() + 1 + d() + 2 + d() + 3 + d(14)).toString(), new SB().a(U.jvmPid() + d() + TYPE_RANDOM_READ + d(3) + 1 + d(7) + 4 + d() + 5 + d(8)).toString(), new SB().a(U.jvmPid() + d() + TYPE_SEEK + d(3) + 1 + d(7) + 6 + d(9)).toString(), @@ -196,16 +195,16 @@ public class IgniteHadoopFileSystemLoggerSelfTest extends IgfsCommonAbstractTest public void testLogWrite() throws Exception { IgfsLogger log = IgfsLogger.logger(ENDPOINT, IGFS_NAME, LOG_DIR, 10); - log.logCreate(1, PATH, PRIMARY, true, 2, new Integer(3).shortValue(), 4L); - log.logAppend(2, PATH, PRIMARY, 8); + log.logCreate(1, PATH, true, 2, new Integer(3).shortValue(), 4L); + log.logAppend(2, PATH, 8); log.logCloseOut(2, 9L, 10L, 11); log.close(); checkLog( - new SB().a(U.jvmPid() + d() + TYPE_OPEN_OUT + d() + PATH_STR_ESCAPED + d() + PRIMARY + d() + 1 + d() + + new SB().a(U.jvmPid() + d() + TYPE_OPEN_OUT + d() + PATH_STR_ESCAPED + d() + d() + 1 + d() + 2 + d(2) + 0 + d() + 1 + d() + 3 + d() + 4 + d(10)).toString(), - new SB().a(U.jvmPid() + d() + TYPE_OPEN_OUT + d() + PATH_STR_ESCAPED + d() + PRIMARY + d() + 2 + d() + + new SB().a(U.jvmPid() + d() + TYPE_OPEN_OUT + d() + PATH_STR_ESCAPED + d() + d() + 2 + d() + 8 + d(2) + 1 + d(13)).toString(), new SB().a(U.jvmPid() + d() + TYPE_CLOSE_OUT + d(3) + 2 + d(11) + 9 + d() + 10 + d() + 11 + d(3)) .toString() @@ -225,20 +224,20 @@ public class IgniteHadoopFileSystemLoggerSelfTest extends IgfsCommonAbstractTest String file1 = "/dir3/file1.test"; String file2 = "/dir3/file1.test"; - log.logMakeDirectory(PATH, PRIMARY); - log.logRename(PATH, PRIMARY, new IgfsPath(newFile)); - log.logListDirectory(PATH, PRIMARY, new String[] { file1, file2 }); - log.logDelete(PATH, PRIMARY, false); + log.logMakeDirectory(PATH); + log.logRename(PATH, new IgfsPath(newFile)); + log.logListDirectory(PATH, new String[] { file1, file2 }); + log.logDelete(PATH, false); log.close(); checkLog( - new SB().a(U.jvmPid() + d() + TYPE_DIR_MAKE + d() + PATH_STR_ESCAPED + d() + PRIMARY + d(17)).toString(), - new SB().a(U.jvmPid() + d() + TYPE_RENAME + d() + PATH_STR_ESCAPED + d() + PRIMARY + d(15) + newFile + + new SB().a(U.jvmPid() + d() + TYPE_DIR_MAKE + d() + PATH_STR_ESCAPED + d() + d(17)).toString(), + new SB().a(U.jvmPid() + d() + TYPE_RENAME + d() + PATH_STR_ESCAPED + d() + d(15) + newFile + d(2)).toString(), - new SB().a(U.jvmPid() + d() + TYPE_DIR_LIST + d() + PATH_STR_ESCAPED + d() + PRIMARY + d(17) + file1 + + new SB().a(U.jvmPid() + d() + TYPE_DIR_LIST + d() + PATH_STR_ESCAPED + d() + d(17) + file1 + DELIM_FIELD_VAL + file2).toString(), - new SB().a(U.jvmPid() + d() + TYPE_DELETE + d(1) + PATH_STR_ESCAPED + d() + PRIMARY + d(16) + 0 + + new SB().a(U.jvmPid() + d() + TYPE_DELETE + d(1) + PATH_STR_ESCAPED + d() + d(16) + 0 + d()).toString() ); } @@ -247,6 +246,7 @@ public class IgniteHadoopFileSystemLoggerSelfTest extends IgfsCommonAbstractTest * Ensure that log file has only the following lines. * * @param lines Expected lines. + * @throws Exception If failed. */ private void checkLog(String... lines) throws Exception { BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(LOG_FILE))); http://git-wip-us.apache.org/repos/asf/ignite/blob/feba9534/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.java deleted file mode 100644 index e710b97..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl.igfs; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.FileSystemConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; -import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; -import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper; -import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration; -import org.apache.ignite.igfs.IgfsIpcEndpointType; -import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest; -import org.apache.ignite.internal.util.typedef.G; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; - -import java.net.URI; - -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheMode.REPLICATED; -import static org.apache.ignite.igfs.IgfsMode.PRIMARY; - -/** - * Ensures correct modes resolution for SECONDARY paths. - */ -public class IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest extends IgfsCommonAbstractTest { - /** File system. */ - private IgniteHadoopFileSystem fs; - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - U.closeQuiet(fs); - - fs = null; - - G.stopAll(true); - } - - /** - * Perform initial startup. - * - * @param initDfltPathModes WHether to initialize default path modes. - * @throws Exception If failed. - */ - @SuppressWarnings({"NullableProblems", "unchecked"}) - private void startUp(boolean initDfltPathModes) throws Exception { - startUpSecondary(); - - FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); - - igfsCfg.setName("igfs"); - igfsCfg.setBlockSize(512 * 1024); - igfsCfg.setInitializeDefaultPathModes(initDfltPathModes); - - IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); - - endpointCfg.setType(IgfsIpcEndpointType.TCP); - endpointCfg.setPort(10500); - - igfsCfg.setIpcEndpointConfiguration(endpointCfg); - - igfsCfg.setManagementPort(-1); - igfsCfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem( - "igfs://igfs-secondary@127.0.0.1:11500/", - "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml")); - - CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); - - dataCacheCfg.setCacheMode(PARTITIONED); - dataCacheCfg.setNearConfiguration(null); - dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128)); - dataCacheCfg.setBackups(0); - dataCacheCfg.setAtomicityMode(TRANSACTIONAL); - - CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); - - metaCacheCfg.setCacheMode(REPLICATED); - metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - metaCacheCfg.setAtomicityMode(TRANSACTIONAL); - - igfsCfg.setDataCacheConfiguration(dataCacheCfg); - igfsCfg.setMetaCacheConfiguration(metaCacheCfg); - - IgniteConfiguration cfg = new IgniteConfiguration(); - - cfg.setIgniteInstanceName("igfs-grid"); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); - - cfg.setDiscoverySpi(discoSpi); - cfg.setFileSystemConfiguration(igfsCfg); - - cfg.setLocalHost("127.0.0.1"); - - G.start(cfg); - - Configuration fsCfg = new Configuration(); - - fsCfg.addResource(U.resolveIgniteUrl("modules/core/src/test/config/hadoop/core-site-loopback.xml")); - - fsCfg.setBoolean("fs.igfs.impl.disable.cache", true); - - fs = (IgniteHadoopFileSystem)FileSystem.get(new URI("igfs://igfs@/"), fsCfg); - } - - /** - * Startup secondary file system. - * - * @throws Exception If failed. - */ - @SuppressWarnings("unchecked") - private void startUpSecondary() throws Exception { - FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); - - igfsCfg.setName("igfs-secondary"); - igfsCfg.setBlockSize(512 * 1024); - igfsCfg.setDefaultMode(PRIMARY); - - IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); - - endpointCfg.setType(IgfsIpcEndpointType.TCP); - endpointCfg.setPort(11500); - - igfsCfg.setIpcEndpointConfiguration(endpointCfg); - - CacheConfiguration cacheCfg = defaultCacheConfiguration(); - - cacheCfg.setCacheMode(PARTITIONED); - cacheCfg.setNearConfiguration(null); - cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128)); - cacheCfg.setBackups(0); - cacheCfg.setAtomicityMode(TRANSACTIONAL); - - CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); - - metaCacheCfg.setCacheMode(REPLICATED); - metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - metaCacheCfg.setAtomicityMode(TRANSACTIONAL); - - igfsCfg.setDataCacheConfiguration(cacheCfg); - igfsCfg.setMetaCacheConfiguration(metaCacheCfg); - - IgniteConfiguration cfg = new IgniteConfiguration(); - - cfg.setIgniteInstanceName("igfs-grid-secondary"); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); - - cfg.setDiscoverySpi(discoSpi); - cfg.setFileSystemConfiguration(igfsCfg); - - cfg.setLocalHost("127.0.0.1"); - - G.start(cfg); - } - - /** - * Test scenario when defaults are initialized. - * - * @throws Exception If failed. - */ - public void testDefaultsInitialized() throws Exception { - check(true); - } - - /** - * Test scenario when defaults are not initialized. - * - * @throws Exception If failed. - */ - public void testDefaultsNotInitialized() throws Exception { - check(false); - } - - /** - * Actual check. - * - * @param initDfltPathModes Whether to initialize default path modes. - * @throws Exception If failed. - */ - private void check(boolean initDfltPathModes) throws Exception { - startUp(initDfltPathModes); - - assertEquals(initDfltPathModes, fs.hasSecondaryFileSystem()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/feba9534/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index b012083..576d8db 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -62,7 +62,6 @@ import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSy import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemLoopbackExternalDualSyncSelfTest; import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemLoopbackExternalPrimarySelfTest; import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemLoopbackExternalSecondarySelfTest; -import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest; import org.apache.ignite.internal.processors.hadoop.impl.HadoopCommandLineTest; import org.apache.ignite.internal.processors.hadoop.impl.HadoopFileSystemsTest; import org.apache.ignite.internal.processors.hadoop.impl.HadoopGroupingTest; @@ -165,8 +164,6 @@ public class IgniteHadoopTestSuite extends TestSuite { suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackExternalToClientDualAsyncSelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackExternalToClientProxySelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemClientSelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoggerStateSelfTest.class.getName())));