Repository: ignite Updated Branches: refs/heads/ignite-2206 [created] 31d3289df
2206 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7a3f9dd3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7a3f9dd3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7a3f9dd3 Branch: refs/heads/ignite-2206 Commit: 7a3f9dd35ccf487e64318c8bf76925ad0bac0339 Parents: 2848680 Author: iveselovskiy <iveselovs...@gridgain.com> Authored: Mon Dec 21 17:41:20 2015 +0300 Committer: iveselovskiy <iveselovs...@gridgain.com> Committed: Mon Dec 21 17:41:20 2015 +0300 ---------------------------------------------------------------------- .../ignite/igfs/HadoopFileSystemFactory.java | 30 +++ .../igfs/secondary/IgfsSecondaryFileSystem.java | 11 +- .../ignite/internal/processors/igfs/IgfsEx.java | 24 +-- .../internal/processors/igfs/IgfsImpl.java | 5 +- .../internal/processors/igfs/IgfsPaths.java | 32 ++- .../igfs/IgfsSecondaryFileSystemImpl.java | 8 +- .../ignite/internal/util/lang/GridFunc.java | 24 +++ .../visor/node/VisorIgfsConfiguration.java | 75 ++++--- .../org/apache/ignite/hadoop/HadoopFsIssue.java | 71 +++++++ .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 207 ++++++++++++------- .../hadoop/fs/v1/IgniteHadoopFileSystem.java | 28 +-- .../hadoop/fs/v2/HadoopV2FileSystemFactory.java | 11 + .../hadoop/fs/v2/IgniteHadoopFileSystem.java | 33 ++- .../hadoop/IgfsSecondaryFileSystemEx.java | 15 ++ .../KerberosSecondaryFileSystemProvider.java | 55 +++++ .../hadoop/SecondaryFileSystemProvider.java | 29 ++- .../fs/DefaultHadoopFileSystemFactory.java | 183 ++++++++++++++++ ...oopFileSystemUniversalFileSystemAdapter.java | 2 + .../testsuites/IgniteHadoopTestSuite.java | 2 +- parent/pom.xml | 2 +- 20 files changed, 696 insertions(+), 151 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7a3f9dd3/modules/core/src/main/java/org/apache/ignite/igfs/HadoopFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/HadoopFileSystemFactory.java b/modules/core/src/main/java/org/apache/ignite/igfs/HadoopFileSystemFactory.java new file mode 100644 index 0000000..86d39e1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/igfs/HadoopFileSystemFactory.java @@ -0,0 +1,30 @@ +package org.apache.ignite.igfs; + +import java.io.Externalizable; +import java.io.IOException; +import java.net.URI; + +/** + * This factory is {@link Externalizable} because it should be transferable over the network. + * + * @param <T> The type + */ +public interface HadoopFileSystemFactory <T> extends Externalizable { + /** + * Gets the file system, possibly creating it or taking a cached instance. + * All the other data needed for the file system creation are expected to be contained + * in this object instance. + * + * @param userName The user name + * @return The file system. + * @throws IOException On error. + */ + public T get(String userName) throws IOException; + + /** + * Getter for the file system URI. + * + * @return The file system URI. + */ + public URI uri(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7a3f9dd3/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java index ca6ecb7..696c81a 100644 --- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java +++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java @@ -21,6 +21,7 @@ import java.io.OutputStream; import java.util.Collection; import java.util.Map; import org.apache.ignite.IgniteException; +import org.apache.ignite.igfs.HadoopFileSystemFactory; import org.apache.ignite.igfs.IgfsFile; import org.apache.ignite.igfs.IgfsPath; import org.jetbrains.annotations.Nullable; @@ -28,7 +29,7 @@ import org.jetbrains.annotations.Nullable; /** * Secondary file system interface. */ -public interface IgfsSecondaryFileSystem { +public interface IgfsSecondaryFileSystem <F> { /** * Checks if the specified path exists. * @@ -197,9 +198,17 @@ public interface IgfsSecondaryFileSystem { * Gets the implementation specific properties of file system. * * @return Map of properties. + * @deprecated Should not be used. */ + @Deprecated public Map<String,String> properties(); + /** + * + * @return The factory. + */ + public @Nullable HadoopFileSystemFactory<F> getSecondaryFileSystemFactory(); + /** * Closes the secondary file system. http://git-wip-us.apache.org/repos/asf/ignite/blob/7a3f9dd3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java index 8ff7247..a338813 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java @@ -43,18 +43,18 @@ public interface IgfsEx extends IgniteFileSystem { /** File property: prefer writes to local node. */ public static final String PROP_PREFER_LOCAL_WRITES = "locWrite"; - /** Property name for path to Hadoop configuration. */ - public static final String SECONDARY_FS_CONFIG_PATH = "SECONDARY_FS_CONFIG_PATH"; - - /** Property name for URI of file system. */ - public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI"; - - /** Property name for default user name of file system. - * NOTE: for secondary file system this is just a default user name, which is used - * when the 2ndary filesystem is used outside of any user context. - * If another user name is set in the context, 2ndary file system will work on behalf - * of that user, which is different from the default. */ - public static final String SECONDARY_FS_USER_NAME = "SECONDARY_FS_USER_NAME"; +// /** Property name for path to Hadoop configuration. */ +// public static final String SECONDARY_FS_CONFIG_PATH = "SECONDARY_FS_CONFIG_PATH"; +// +// /** Property name for URI of file system. */ +// public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI"; +// +// /** Property name for default user name of file system. +// * NOTE: for secondary file system this is just a default user name, which is used +// * when the 2ndary filesystem is used outside of any user context. +// * If another user name is set in the context, 2ndary file system will work on behalf +// * of that user, which is different from the default. */ +// public static final String SECONDARY_FS_USER_NAME = "SECONDARY_FS_USER_NAME"; /** * Stops IGFS cleaning all used resources. http://git-wip-us.apache.org/repos/asf/ignite/blob/7a3f9dd3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index 0d5cda3..7ea0333 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -254,7 +254,10 @@ public final class IgfsImpl implements IgfsEx { modeRslvr = new IgfsModeResolver(dfltMode, modes); - secondaryPaths = new IgfsPaths(secondaryFs == null ? null : secondaryFs.properties(), dfltMode, + secondaryPaths = new IgfsPaths( + secondaryFs == null ? null : secondaryFs.properties(), + secondaryFs == null ? null : secondaryFs.getSecondaryFileSystemFactory(), + dfltMode, modeRslvr.modesOrdered()); // Check whether IGFS LRU eviction policy is set on data cache. http://git-wip-us.apache.org/repos/asf/ignite/blob/7a3f9dd3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java index fbf89ce..bf7e825 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java @@ -24,6 +24,7 @@ import java.io.ObjectOutput; import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.ignite.igfs.HadoopFileSystemFactory; import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.internal.util.typedef.T2; @@ -33,13 +34,17 @@ import org.jetbrains.annotations.Nullable; /** * Description of path modes. */ -public class IgfsPaths implements Externalizable { +public class IgfsPaths <F> implements Externalizable { /** */ private static final long serialVersionUID = 0L; /** Additional secondary file system properties. */ + @Deprecated private Map<String, String> props; + /** */ + private HadoopFileSystemFactory<F> factory; + /** Default IGFS mode. */ private IgfsMode dfltMode; @@ -60,16 +65,22 @@ public class IgfsPaths implements Externalizable { * @param dfltMode Default IGFS mode. * @param pathModes Path modes. */ - public IgfsPaths(Map<String, String> props, IgfsMode dfltMode, @Nullable List<T2<IgfsPath, - IgfsMode>> pathModes) { + public IgfsPaths(Map<String, String> props, + HadoopFileSystemFactory<F> factory, + IgfsMode dfltMode, + @Nullable List<T2<IgfsPath, IgfsMode>> pathModes) { this.props = props; + this.factory = factory; this.dfltMode = dfltMode; this.pathModes = pathModes; } /** * @return Secondary file system properties. + * + * @deprecated */ + @Deprecated public Map<String, String> properties() { return props; } @@ -88,9 +99,21 @@ public class IgfsPaths implements Externalizable { return pathModes; } + /** + * Getter for factory. + * + * @return The factory. + */ + public HadoopFileSystemFactory<F> factory() { + return factory; + } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { U.writeStringMap(out, props); + + out.writeObject(factory); + U.writeEnum(out, dfltMode); if (pathModes != null) { @@ -109,6 +132,9 @@ public class IgfsPaths implements Externalizable { /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { props = U.readStringMap(in); + + factory = (HadoopFileSystemFactory<F>)in.readObject(); + dfltMode = IgfsMode.fromOrdinal(in.readByte()); if (in.readBoolean()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/7a3f9dd3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java index 23d6322..1b1ce24 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; import org.apache.ignite.IgniteException; +import org.apache.ignite.igfs.HadoopFileSystemFactory; import org.apache.ignite.igfs.IgfsFile; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; @@ -31,7 +32,7 @@ import org.jetbrains.annotations.Nullable; /** * Secondary file system over native IGFS. */ -class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem { +class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem <IgfsEx> { /** Delegate. */ private final IgfsEx igfs; @@ -126,4 +127,9 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem { @Override public void close() throws IgniteException { // No-op. } + + /** {@inheritDoc} */ + @Override public HadoopFileSystemFactory<IgfsEx> getSecondaryFileSystemFactory() { + return null; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/7a3f9dd3/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java index 8d5a8e7..224d205 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java @@ -2290,6 +2290,30 @@ public class GridFunc { } /** + * Nullifies an empty String. + * @param x The argument. + * @return Nullified argument. + */ + public static String nullifyEmpty(String x) { + if (isEmpty(x)) + return null; + + return x; + } + + /** + * Nullifies an empty collection. + * @param c The argument. + * @return Nullified argument. + */ + public static <T> Collection<T> nullifyEmpty(Collection<T> c) { + if (isEmpty(c)) + return null; + + return c; + } + + /** * Utility map getter. This method analogous to {@link #addIfAbsent(Map, Object, Callable)} * method but this one doesn't put the default value into the map when key is not found. * http://git-wip-us.apache.org/repos/asf/ignite/blob/7a3f9dd3/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java index e85484d..4a2e7b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java @@ -23,15 +23,16 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.igfs.HadoopFileSystemFactory; import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration; import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_CONFIG_PATH; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_URI; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_USER_NAME; +//import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_CONFIG_PATH; +//import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_URI; +//import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_USER_NAME; import static org.apache.ignite.internal.visor.util.VisorTaskUtils.compactClass; /** @@ -65,14 +66,16 @@ public class VisorIgfsConfiguration implements Serializable { /** Number of batches that can be concurrently sent to remote node. */ private int perNodeParallelBatchCnt; - /** URI of the secondary Hadoop file system. */ - private String secondaryHadoopFileSysUri; +// /** URI of the secondary Hadoop file system. */ +// private String secondaryHadoopFileSysUri; +// +// /** Path for the secondary hadoop file system config. */ +// private String secondaryHadoopFileSysCfgPath; +// +// /** User name for the secondary hadoop file system config. */ +// private String secondaryHadoopFileSysUserName; - /** Path for the secondary hadoop file system config. */ - private String secondaryHadoopFileSysCfgPath; - - /** User name for the secondary hadoop file system config. */ - private String secondaryHadoopFileSysUserName; + private HadoopFileSystemFactory factory; /** IGFS instance mode. */ private IgfsMode dfltMode; @@ -144,11 +147,14 @@ public class VisorIgfsConfiguration implements Serializable { IgfsSecondaryFileSystem secFs = igfs.getSecondaryFileSystem(); if (secFs != null) { - Map<String, String> props = secFs.properties(); + //Map<String, String> props = secFs.properties(); + + //cfg.secondaryHadoopFileSysUri = props.get(SECONDARY_FS_URI); + //cfg.secondaryHadoopFileSysCfgPath = props.get(SECONDARY_FS_CONFIG_PATH); + //cfg.secondaryHadoopFileSysUserName = props.get(SECONDARY_FS_USER_NAME); - cfg.secondaryHadoopFileSysUri = props.get(SECONDARY_FS_URI); - cfg.secondaryHadoopFileSysCfgPath = props.get(SECONDARY_FS_CONFIG_PATH); - cfg.secondaryHadoopFileSysUserName = props.get(SECONDARY_FS_USER_NAME); + // Just take and save the factory object: + cfg.factory = secFs.getSecondaryFileSystemFactory(); } cfg.dfltMode = igfs.getDefaultMode(); @@ -250,25 +256,34 @@ public class VisorIgfsConfiguration implements Serializable { return perNodeParallelBatchCnt; } - /** - * @return URI of the secondary Hadoop file system. - */ - @Nullable public String secondaryHadoopFileSystemUri() { - return secondaryHadoopFileSysUri; - } - - /** - * @return User name of the secondary Hadoop file system. - */ - @Nullable public String secondaryHadoopFileSystemUserName() { - return secondaryHadoopFileSysUserName; - } +// /** +// * @return URI of the secondary Hadoop file system. +// */ +// @Nullable public String secondaryHadoopFileSystemUri() { +// return secondaryHadoopFileSysUri; +// } +// +// /** +// * @return User name of the secondary Hadoop file system. +// */ +// @Nullable public String secondaryHadoopFileSystemUserName() { +// return secondaryHadoopFileSysUserName; +// } +// +// /** +// * @return Path for the secondary hadoop file system config. +// */ +// @Nullable public String secondaryHadoopFileSystemConfigPath() { +// return secondaryHadoopFileSysCfgPath; +// } /** - * @return Path for the secondary hadoop file system config. + * + * @param <T> + * @return */ - @Nullable public String secondaryHadoopFileSystemConfigPath() { - return secondaryHadoopFileSysCfgPath; + public <T> HadoopFileSystemFactory<T> secondaryFileSystemFactory() { + return factory; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7a3f9dd3/modules/hadoop/src/main/java/org/apache/ignite/hadoop/HadoopFsIssue.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/HadoopFsIssue.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/HadoopFsIssue.java new file mode 100644 index 0000000..82314f1 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/HadoopFsIssue.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.ignite.internal.processors.hadoop.SecondaryFileSystemProvider; + +/** + * Comment. + */ +public class HadoopFsIssue { + /** + * + * @param args + */ + public static void main(String args[]) { + String uri = null; + String cfgPath = null; + String user = null; + + for (String arg : args) { + if (arg.startsWith("uri=")) + uri = arg.split("=")[1].trim(); + else if (arg.startsWith("cfg=")) + cfgPath = arg.split("=")[1].trim(); + else if (arg.startsWith("user=")) + user = arg.split("=")[1].trim(); + else + throw new IllegalArgumentException("Unknown argument:" + arg); + } + + System.out.println("Connecting to HDFS with the following settings [uri=" + uri + ", cfg=" + cfgPath + ", user=" + user + ']'); + + try { + SecondaryFileSystemProvider provider = new SecondaryFileSystemProvider(uri, cfgPath); + + FileSystem fs = provider.createFileSystem(user); + + RemoteIterator<LocatedFileStatus> iter = fs.listFiles(new Path("/tmp"), true); + + System.out.println("Got the iterator"); + + while (iter.hasNext()) { + LocatedFileStatus status = iter.next(); + + System.out.println(status); + } + } + catch (Exception e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7a3f9dd3/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java index 1ca6938..c5124f0 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java @@ -24,7 +24,6 @@ import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -35,6 +34,7 @@ import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.igfs.HadoopFileSystemFactory; import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException; import org.apache.ignite.igfs.IgfsException; import org.apache.ignite.igfs.IgfsFile; @@ -45,64 +45,80 @@ import org.apache.ignite.igfs.IgfsPathNotFoundException; import org.apache.ignite.igfs.IgfsUserContext; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; -import org.apache.ignite.internal.processors.hadoop.SecondaryFileSystemProvider; -import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap; -import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap.ValueFactory; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProperties; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsSecondaryFileSystemPositionedReadable; -import org.apache.ignite.internal.processors.igfs.IgfsEx; import org.apache.ignite.internal.processors.igfs.IgfsFileImpl; import org.apache.ignite.internal.processors.igfs.IgfsFileInfo; import org.apache.ignite.internal.processors.igfs.IgfsUtils; import org.apache.ignite.internal.util.typedef.F; +import static org.apache.ignite.internal.util.typedef.F.*; + +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lifecycle.LifecycleAware; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_GROUP_NAME; import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PERMISSION; import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_USER_NAME; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_CONFIG_PATH; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_URI; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_USER_NAME; /** * Adapter to use any Hadoop file system {@link FileSystem} as {@link IgfsSecondaryFileSystem}. * In fact, this class deals with different FileSystems depending on the user context, * see {@link IgfsUserContext#currentUser()}. */ -public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem { - /** Properties of file system, see {@link #properties()} - * - * See {@link IgfsEx#SECONDARY_FS_CONFIG_PATH} - * See {@link IgfsEx#SECONDARY_FS_URI} - * See {@link IgfsEx#SECONDARY_FS_USER_NAME} - * */ - private final Map<String, String> props = new HashMap<>(); +public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem <FileSystem> { +// /** Properties of file system, see {@link #properties()} +// * */ +// private final Map<String, String> props = new HashMap<>(); /** Secondary file system provider. */ - private final SecondaryFileSystemProvider secProvider; - - /** The default user name. It is used if no user context is set. */ - private final String dfltUserName; + //private SecondaryFileSystemProvider secProvider; /** FileSystem instance created for the default user. * Stored outside the fileSysLazyMap due to performance reasons. */ - private final FileSystem dfltFs; + private FileSystem dfltFs; + +// /** */ +// private String uriStr; +// +// /** Note: */ +// private URI uri; +// +// /** */ +// private Collection<String> cfgPathsStr; +// +// /** */ +// private @Nullable Collection<URI> cfgPaths; - /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */ - private final HadoopLazyConcurrentMap<String, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>( - new ValueFactory<String, FileSystem>() { - @Override public FileSystem createValue(String key) { - try { - assert !F.isEmpty(key); + /** The default user name. It is used if no user context is set. */ + private String dfltUserName = IgfsUtils.fixUserName(null); + + /** */ + private HadoopFileSystemFactory<FileSystem> fsFactory; + +// /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */ +// private final HadoopLazyConcurrentMap<String, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>( +// new ValueFactory<String, FileSystem>() { +// @Override public FileSystem createValue(String key) { +// try { +// assert !F.isEmpty(key); +// +// return secProvider.createFileSystem(key); +// } +// catch (IOException ioe) { +// throw new IgniteException(ioe); +// } +// } +// } +// ); - return secProvider.createFileSystem(key); - } - catch (IOException ioe) { - throw new IgniteException(ioe); - } - } - } - ); + /** + * Default constructor for Spring. + */ + public IgniteHadoopIgfsSecondaryFileSystem() { + // noop. + } /** * Simple constructor that is to be used by default. @@ -136,43 +152,54 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys */ public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath, @Nullable String userName) throws IgniteCheckedException { - // Treat empty uri and userName arguments as nulls to improve configuration usability: - if (F.isEmpty(uri)) - uri = null; - - if (F.isEmpty(cfgPath)) - cfgPath = null; - - if (F.isEmpty(userName)) - userName = null; - this.dfltUserName = IgfsUtils.fixUserName(userName); + uri = nullifyEmpty(uri); + if (uri != null) + U.warn(null, "This constructor is deprecated. URI value passed in will be ignored."); - try { - this.secProvider = new SecondaryFileSystemProvider(uri, cfgPath); - - // File system creation for the default user name. - // The value is *not* stored in the 'fileSysLazyMap' cache, but saved in field: - this.dfltFs = secProvider.createFileSystem(dfltUserName); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - - assert dfltFs != null; + cfgPath = nullifyEmpty(cfgPath); + if (cfgPath != null) + U.warn(null, "This constructor is deprecated. The configurationPath value passed in will be ignored."); - uri = secProvider.uri().toString(); + setDfltUserName(userName); + } - if (!uri.endsWith("/")) - uri += "/"; + /** + * + * @param factory + */ + public void setFsFactory(HadoopFileSystemFactory factory) { + A.ensure(factory != null, "Factory value must not be null."); - if (cfgPath != null) - props.put(SECONDARY_FS_CONFIG_PATH, cfgPath); + this.fsFactory = factory; + } - props.put(SECONDARY_FS_URI, uri); - props.put(SECONDARY_FS_USER_NAME, dfltUserName); + /** + * + * @param dfltUserName + */ + public void setDfltUserName(String dfltUserName) { + this.dfltUserName = IgfsUtils.fixUserName(nullifyEmpty(dfltUserName)); } +// /** +// * Sets the file system properties. +// */ +// private void setProperties() { +// String uri = this.uri.toString(); +// +// if (!uri.endsWith("/")) +// uri += "/"; +// +// String cfgPath = secProvider.configurationPath(); +// +// if (cfgPath != null) +// props.put(SECONDARY_FS_CONFIG_PATH, cfgPath); +// +// props.put(SECONDARY_FS_URI, uri); +// props.put(SECONDARY_FS_USER_NAME, dfltUserName); +// } + /** * Convert IGFS path into Hadoop path. * @@ -488,7 +515,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public Map<String, String> properties() { - return props; + return Collections.emptyMap(); } /** {@inheritDoc} */ @@ -496,18 +523,20 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys Exception e = null; try { - dfltFs.close(); + if (dfltFs != null) + dfltFs.close(); } catch (Exception e0) { e = e0; } try { - fileSysLazyMap.close(); + if (fsFactory instanceof LifecycleAware) + ((LifecycleAware)fsFactory).stop(); } - catch (IgniteCheckedException ice) { + catch (IgniteException ie) { if (e == null) - e = ice; + e = ie; } if (e != null) @@ -538,6 +567,44 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys if (F.eq(user, dfltUserName)) return dfltFs; // optimization - return fileSysLazyMap.getOrCreate(user); + try { + return fsFactory.get(user); + } + catch (IOException ioe) { + throw new IgniteException(ioe); + } + } + + /** + * Should be invoked by client (from Spring?) after all the setters invoked. + * TODO: how this should be invoked? + * + * @throws IgniteCheckedException + */ + public void start() throws IgniteCheckedException { + A.ensure(fsFactory != null, "factory"); + A.ensure(dfltUserName != null, "dfltUserName"); + + if (fsFactory instanceof LifecycleAware) + ((LifecycleAware) fsFactory).start(); + + try { + //this.secProvider = new SecondaryFileSystemProvider(uri, cfgPath); + + // File system creation for the default user name. + // The value is *not* stored in the 'fileSysLazyMap' cache, but saved in field: + //this.dfltFs = secProvider.createFileSystem(dfltUserName); + this.dfltFs = fsFactory.get(dfltUserName); + + assert dfltFs != null; + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public HadoopFileSystemFactory<FileSystem> getSecondaryFileSystemFactory() { + return fsFactory; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/7a3f9dd3/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java index 778792a..1b748fb 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.apache.ignite.IgniteException; +import org.apache.ignite.igfs.HadoopFileSystemFactory; import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsException; import org.apache.ignite.igfs.IgfsFile; @@ -51,7 +52,6 @@ import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.igfs.IgfsPathSummary; import org.apache.ignite.internal.igfs.common.IgfsLogger; -import org.apache.ignite.internal.processors.hadoop.SecondaryFileSystemProvider; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsInputStream; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyInputStream; @@ -85,8 +85,6 @@ import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_GROUP_NAME; import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PERMISSION; import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PREFER_LOCAL_WRITES; import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_USER_NAME; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_CONFIG_PATH; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_URI; /** * {@code IGFS} Hadoop 1.x file system driver over file system API. To use @@ -293,7 +291,7 @@ public class IgniteHadoopFileSystem extends FileSystem { igfsGrpBlockSize = handshake.blockSize(); - IgfsPaths paths = handshake.secondaryPaths(); + final IgfsPaths<FileSystem> paths = handshake.secondaryPaths(); // Initialize client logger. Boolean logEnabled = parameter(cfg, PARAM_IGFS_LOG_ENABLED, uriAuthority, false); @@ -327,21 +325,27 @@ public class IgniteHadoopFileSystem extends FileSystem { } if (initSecondary) { - Map<String, String> props = paths.properties(); +// Map<String, String> props = paths.properties(); +// +// String secUri = props.get(SECONDARY_FS_URI); +// String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH); - String secUri = props.get(SECONDARY_FS_URI); - String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH); + HadoopFileSystemFactory<FileSystem> factory = paths.factory(); - try { - SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath); + A.ensure(factory != null, "Secondary file system factory should not be null."); + + secondaryUri = factory.uri(); - secondaryFs = secProvider.createFileSystem(user); + A.ensure(secondaryUri != null, "Secondary file system uri should not be null."); + + try { + //SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath); - secondaryUri = secProvider.uri(); + secondaryFs = factory.get(user); //secProvider.createFileSystem(user); } catch (IOException e) { if (!mgmt) - throw new IOException("Failed to connect to the secondary file system: " + secUri, e); + throw new IOException("Failed to connect to the secondary file system: " + secondaryUri, e); else LOG.warn("Visor failed to create secondary file system (operations on paths with PROXY mode " + "will have no effect): " + e.getMessage()); http://git-wip-us.apache.org/repos/asf/ignite/blob/7a3f9dd3/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/HadoopV2FileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/HadoopV2FileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/HadoopV2FileSystemFactory.java new file mode 100644 index 0000000..c2ab620 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/HadoopV2FileSystemFactory.java @@ -0,0 +1,11 @@ +//package org.apache.ignite.hadoop.fs.v2; +// +//import org.apache.hadoop.fs.AbstractFileSystem; +//import org.apache.hadoop.fs.FileSystem; +// +///** +// * Created by ivan on 18.12.15. +// */ +//public interface HadoopV2FileSystemFactory { +// AbstractFileSystem create(String uri, String configPath, String userName); +//} http://git-wip-us.apache.org/repos/asf/ignite/blob/7a3f9dd3/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java index 99ca1ec..865a2bc 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java @@ -52,12 +52,12 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; import org.apache.ignite.IgniteException; +import org.apache.ignite.igfs.HadoopFileSystemFactory; import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsFile; import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.internal.igfs.common.IgfsLogger; -import org.apache.ignite.internal.processors.hadoop.SecondaryFileSystemProvider; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsInputStream; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream; @@ -92,8 +92,8 @@ import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_GROUP_NAME; import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PERMISSION; import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PREFER_LOCAL_WRITES; import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_USER_NAME; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_CONFIG_PATH; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_URI; +//import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_CONFIG_PATH; +//import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_URI; /** * {@code IGFS} Hadoop 2.x file system driver over file system API. To use @@ -302,7 +302,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea grpBlockSize = handshake.blockSize(); - IgfsPaths paths = handshake.secondaryPaths(); + IgfsPaths<AbstractFileSystem> paths = handshake.secondaryPaths(); Boolean logEnabled = parameter(cfg, PARAM_IGFS_LOG_ENABLED, uriAuthority, false); @@ -335,20 +335,31 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea } if (initSecondary) { - Map<String, String> props = paths.properties(); +// Map<String, String> props = paths.properties(); +// +// String secUri = props.get(SECONDARY_FS_URI); +// String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH); - String secUri = props.get(SECONDARY_FS_URI); - String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH); + HadoopFileSystemFactory<AbstractFileSystem> factory + = (HadoopFileSystemFactory<AbstractFileSystem>)paths.factory(); + + A.ensure(secondaryUri != null, "File system factory uri should not be null."); + + secondaryUri = factory.uri(); + + A.ensure(secondaryUri != null, "Secondary file system uri should not be null."); try { - SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath); + //SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath); + + secondaryFs = factory.get(user); - secondaryFs = secProvider.createAbstractFileSystem(user); + //secondaryFs = secProvider.createAbstractFileSystem(user); - secondaryUri = secProvider.uri(); + //secondaryUri = secProvider.uri(); } catch (IOException e) { - throw new IOException("Failed to connect to the secondary file system: " + secUri, e); + throw new IOException("Failed to connect to the secondary file system: " + secondaryUri, e); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7a3f9dd3/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgfsSecondaryFileSystemEx.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgfsSecondaryFileSystemEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgfsSecondaryFileSystemEx.java new file mode 100644 index 0000000..582e798 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgfsSecondaryFileSystemEx.java @@ -0,0 +1,15 @@ +//package org.apache.ignite.internal.processors.hadoop; +// +//import org.apache.ignite.IgniteCheckedException; +//import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; +// +///** +// * +// */ +//public interface IgfsSecondaryFileSystemEx extends IgfsSecondaryFileSystem { +// /** +// * +// * @throws IgniteCheckedException +// */ +// public void start() throws IgniteCheckedException; +//} http://git-wip-us.apache.org/repos/asf/ignite/blob/7a3f9dd3/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/KerberosSecondaryFileSystemProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/KerberosSecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/KerberosSecondaryFileSystemProvider.java new file mode 100644 index 0000000..503ac46 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/KerberosSecondaryFileSystemProvider.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.security.UserGroupInformation; +import org.jetbrains.annotations.Nullable; + +/** + * See https://issues.apache.org/jira/browse/IGNITE-2195 . + */ +public class KerberosSecondaryFileSystemProvider extends SecondaryFileSystemProvider { + /** + * Constructor. + **/ + public KerberosSecondaryFileSystemProvider(@Nullable String secUri, @Nullable String secConfPath) throws IOException { + super(secUri, secConfPath); + } + + /** {@inheritDoc} */ + @Override public FileSystem createFileSystem(String userName) throws IOException { + UserGroupInformation.setConfiguration(cfg); + + UserGroupInformation ugi = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getCurrentUser()); + + try { + return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { + @Override public FileSystem run() throws Exception { + return FileSystem.get(uri, cfg); + } + }); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IOException("Failed to create file system due to interrupt.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7a3f9dd3/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java index d5be074..d0326ea 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java @@ -30,6 +30,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils; import org.apache.ignite.internal.processors.igfs.IgfsUtils; import org.apache.ignite.internal.util.IgniteUtils; +import static org.apache.ignite.internal.util.typedef.F.*; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; @@ -38,10 +39,13 @@ import org.jetbrains.annotations.Nullable; */ public class SecondaryFileSystemProvider { /** Configuration of the secondary filesystem, never null. */ - private final Configuration cfg = HadoopUtils.safeCreateConfiguration(); + protected final Configuration cfg = HadoopUtils.safeCreateConfiguration(); /** The secondary filesystem URI, never null. */ - private final URI uri; + protected final URI uri; + + /** Configuration file path. */ + @Nullable protected final String confPath; /** * Creates new provider with given config parameters. The configuration URL is optional. The filesystem URI must be @@ -53,15 +57,17 @@ public class SecondaryFileSystemProvider { * See {@link IgniteUtils#resolveIgniteUrl(String)} on how the path resolved. * @throws IOException */ - public SecondaryFileSystemProvider(final @Nullable String secUri, - final @Nullable String secConfPath) throws IOException { - if (secConfPath != null) { - URL url = U.resolveIgniteUrl(secConfPath); + public SecondaryFileSystemProvider(@Nullable String secUri, @Nullable String secConfPath) throws IOException { + secUri = nullifyEmpty(secUri); + confPath = nullifyEmpty(secConfPath); + + if (confPath != null) { + URL url = U.resolveIgniteUrl(confPath); if (url == null) { // If secConfPath is given, it should be resolvable: throw new IllegalArgumentException("Failed to resolve secondary file system configuration path " + - "(ensure that it exists locally and you have read access to it): " + secConfPath); + "(ensure that it exists locally and you have read access to it): " + confPath); } cfg.addResource(url); @@ -90,7 +96,7 @@ public class SecondaryFileSystemProvider { * @throws IOException */ public FileSystem createFileSystem(String userName) throws IOException { - userName = IgfsUtils.fixUserName(userName); + userName = IgfsUtils.fixUserName(nullifyEmpty(userName)); final FileSystem fileSys; @@ -136,4 +142,11 @@ public class SecondaryFileSystemProvider { public URI uri() { return uri; } + + /** + * @return The configuration path, if any. + */ + @Nullable public String configurationPath() { + return confPath; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/7a3f9dd3/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/DefaultHadoopFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/DefaultHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/DefaultHadoopFileSystemFactory.java new file mode 100644 index 0000000..5bbf4d9 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/DefaultHadoopFileSystemFactory.java @@ -0,0 +1,183 @@ +package org.apache.ignite.internal.processors.hadoop.fs; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.Collection; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.igfs.HadoopFileSystemFactory; +import org.apache.ignite.internal.processors.hadoop.HadoopUtils; +import org.apache.ignite.internal.processors.igfs.IgfsPaths; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lifecycle.LifecycleAware; + +import static org.apache.ignite.internal.util.lang.GridFunc.nullifyEmpty; + +/** + * The class is to be instantiated as a Spring beans, so it must have public zero-arg constructor. + * The class is serializable as it will be transferred over the network as a part of {@link IgfsPaths} object. + */ +public class DefaultHadoopFileSystemFactory implements HadoopFileSystemFactory<FileSystem>, Externalizable, LifecycleAware { + /** Configuration of the secondary filesystem, never null. */ + protected final Configuration cfg = HadoopUtils.safeCreateConfiguration(); + + /** */ + private URI uri; + + /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */ + private final HadoopLazyConcurrentMap<String, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>( + new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() { + @Override public FileSystem createValue(String key) { + try { + assert !F.isEmpty(key); + + return createFileSystem(key); + } + catch (IOException ioe) { + throw new IgniteException(ioe); + } + } + } + ); + + public DefaultHadoopFileSystemFactory() { + // + } + + @Override public FileSystem get(String userName) throws IOException { + return fileSysLazyMap.getOrCreate(userName); + } + + public void setUri(URI uri) { + this.uri = uri; + } + + @Override public URI uri() { + return uri; + } + +// public void setCfg(Configuration cfg) { +// this.cfg = cfg; +// } + + /** + * Configuration(s) setter, to be invoked from Spring config. + * @param cfgPaths + */ + public void setCfgPaths(Collection<String> cfgPaths) { + cfgPaths = nullifyEmpty(cfgPaths); + + if (cfgPaths == null) + return; + + for (String confPath: cfgPaths) { + confPath = nullifyEmpty(confPath); + + if (confPath != null) { + URL url = U.resolveIgniteUrl(confPath); + + if (url == null) { + // If secConfPath is given, it should be resolvable: + throw new IllegalArgumentException("Failed to resolve secondary file system configuration path " + + "(ensure that it exists locally and you have read access to it): " + confPath); + } + + cfg.addResource(url); + } + } + } + + + protected void init() throws IOException { + String secUri = nullifyEmpty(uri == null ? null : uri.toString()); + + A.ensure(cfg != null, "config"); + + // if secondary fs URI is not given explicitly, try to get it from the configuration: + if (secUri == null) + uri = FileSystem.getDefaultUri(cfg); + else { + try { + uri = new URI(secUri); + } + catch (URISyntaxException use) { + throw new IOException("Failed to resolve secondary file system URI: " + secUri); + } + } + + // Disable caching: + String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(uri.getScheme()); + + cfg.setBoolean(prop, true); + } + + /** + * @return {@link org.apache.hadoop.fs.FileSystem} instance for this secondary Fs. + * @throws IOException + */ + protected FileSystem createFileSystem(String userName) throws IOException { + userName = IgfsUtils.fixUserName(nullifyEmpty(userName)); + + final FileSystem fileSys; + + try { + fileSys = FileSystem.get(uri, cfg, userName); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IOException("Failed to create file system due to interrupt.", e); + } + + return fileSys; + } + + @Override public void writeExternal(ObjectOutput out) throws IOException { + cfg.write(out); + + U.writeString(out, uri.toString()); + } + + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + cfg.clear(); + + cfg.readFields(in); + + String uriStr = U.readString(in); + + try { + uri = new URI(uriStr); + } + catch (URISyntaxException use) { + throw new IOException(use); + } + } + + @Override public void start() throws IgniteException { + try { + init(); + } + catch (IOException ice) { + throw new IgniteException(ice); + } + } + + @Override public void stop() throws IgniteException { + try { + fileSysLazyMap.close(); + } + catch (IgniteCheckedException ice) { + throw new IgniteException(ice); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7a3f9dd3/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java index 608bd25..867351f 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java @@ -42,6 +42,8 @@ public class HadoopFileSystemUniversalFileSystemAdapter implements UniversalFile * @param fs the filesystem to be wrapped. */ public HadoopFileSystemUniversalFileSystemAdapter(FileSystem fs) { + assert fs != null; + this.fileSys = fs; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7a3f9dd3/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index 0216f4b..eac6bb8 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -183,7 +183,7 @@ public class IgniteHadoopTestSuite extends TestSuite { * @throws Exception If failed. */ public static void downloadHadoop() throws Exception { - String ver = IgniteSystemProperties.getString("hadoop.version", "2.4.1"); + String ver = IgniteSystemProperties.getString("hadoop.version", "2.6.0"); X.println("Will use Hadoop version: " + ver); http://git-wip-us.apache.org/repos/asf/ignite/blob/7a3f9dd3/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index f665d40..0481088 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -69,7 +69,7 @@ <guava14.version>14.0.1</guava14.version> <guava16.version>16.0.1</guava16.version> <h2.version>1.3.175</h2.version> - <hadoop.version>2.4.1</hadoop.version> + <hadoop.version>2.6.0</hadoop.version> <httpclient.version>4.5.1</httpclient.version> <httpcore.version>4.4.3</httpcore.version> <jackson.version>1.9.13</jackson.version>