Repository: incubator-ignite Updated Branches: refs/heads/ignite-218-hdfs-only f23e02fad -> 2a472e161
# IGNITE-218: Review. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2a472e16 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2a472e16 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2a472e16 Branch: refs/heads/ignite-218-hdfs-only Commit: 2a472e16144c243d94c69ce022884889bd87f464 Parents: f23e02f Author: vozerov-gridgain <[email protected]> Authored: Thu May 28 17:41:18 2015 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Thu May 28 17:41:18 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/igfs/IgfsUserContext.java | 3 ++ .../shmem/IpcSharedMemoryServerEndpoint.java | 2 +- .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 5 ++- .../hadoop/fs/v1/IgniteHadoopFileSystem.java | 1 + .../hadoop/fs/HadoopLazyConcurrentMap.java | 38 ++++++++++---------- .../hadoop/v2/HadoopV2TaskContext.java | 1 + 6 files changed, 28 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2a472e16/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java index 567fae5..1a91677 100644 --- a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java +++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java @@ -17,6 +17,7 @@ package org.apache.ignite.igfs; +// TODO: Remove. import org.apache.ignite.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -46,6 +47,7 @@ public abstract class IgfsUserContext { if (F.isEmpty(user)) // use NPE to ensure that #doAs() caller will not treat this exception // as the one thrown from the closure: + // TODO: use IllegalArgument or IgniteException throw new NullPointerException("Failed to use null or empty user name."); final String ctxUser = userStackThreadLocal.get(); @@ -95,6 +97,7 @@ public abstract class IgfsUserContext { if (F.isEmpty(user)) // use NPE to ensure that #doAs() caller will not treat this exception // as the one thrown from the closure: + // TODO: use IllegalArgument or IgniteException throw new NullPointerException("Failed to use null or empty user name."); final String ctxUser = userStackThreadLocal.get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2a472e16/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java index b2bc4cf..86a0886 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java @@ -592,7 +592,7 @@ public class IpcSharedMemoryServerEndpoint implements IpcServerEndpoint { if (log.isDebugEnabled()) log.debug("Token directory is being processed concurrently: " + workTokDir.getAbsolutePath()); } - catch (InterruptedIOException | FileLockInterruptionException ignored) { + catch (InterruptedIOException ignored) { Thread.currentThread().interrupt(); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2a472e16/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java index 024cc68..9ed92ad 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java @@ -108,8 +108,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys * @throws IgniteCheckedException In case of error. */ public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath, - @Nullable String userName) - throws IgniteCheckedException { + @Nullable String userName) throws IgniteCheckedException { // Treat empty uri and userName arguments as nulls to improve configuration usability: if (F.isEmpty(uri)) uri = null; @@ -144,7 +143,6 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys props.put(SECONDARY_FS_CONFIG_PATH, cfgPath); props.put(SECONDARY_FS_URI, uri); - props.put(SECONDARY_FS_USER_NAME, dfltUserName); } @@ -475,6 +473,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys List<IOException> ioExs = new LinkedList<>(); + // TODO: Close is not thread-safe. Set<String> keySet = map.keySet(); for (String key: keySet) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2a472e16/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java index 46c9ba4..c0a9ade 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java @@ -190,6 +190,7 @@ public class IgniteHadoopFileSystem extends FileSystem { if (user == null) { UserGroupInformation currUgi = UserGroupInformation.getCurrentUser(); + if (currUgi != null) user = currUgi.getShortUserName(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2a472e16/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java index cdafdde..cc36ea0 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java @@ -19,12 +19,15 @@ package org.apache.ignite.internal.processors.hadoop.fs; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.future.*; +// TODO: Remove unused import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; import org.jsr166.*; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.*; /** * Maps values by keys. @@ -56,6 +59,7 @@ public class HadoopLazyConcurrentMap<K, V> { * @throws IgniteException on error */ public V getOrCreate(K k) { + // TODO: Do "get" first. final ValueWrapper wNew = new ValueWrapper(k); ValueWrapper w = map.putIfAbsent(k, wNew); @@ -74,7 +78,7 @@ public class HadoopLazyConcurrentMap<K, V> { return v; } - catch (IgniteInterruptedCheckedException ie) { + catch (IgniteCheckedException ie) { throw new IgniteException(ie); } } @@ -93,7 +97,7 @@ public class HadoopLazyConcurrentMap<K, V> { try { return w.getValue(); } - catch (IgniteInterruptedCheckedException ie) { + catch (IgniteCheckedException ie) { throw new IgniteException(ie); } } @@ -115,20 +119,16 @@ public class HadoopLazyConcurrentMap<K, V> { map.clear(); } - /** * Helper class that drives the lazy value creation. */ private class ValueWrapper { - /** Value creation latch */ - private final CountDownLatch vlueCrtLatch = new CountDownLatch(1); + /** Future. */ + private final GridFutureAdapter<V> fut = new GridFutureAdapter<>(); /** the key */ private final K key; - /** the value */ - private V v; - /** * Creates new wrapper. */ @@ -140,14 +140,17 @@ public class HadoopLazyConcurrentMap<K, V> { * Initializes the value using the factory. */ private void init() { - final V v0 = factory.createValue(key); - - if (v0 == null) - throw new IgniteException("Failed to create non-null value. [key=" + key + ']'); + try { + final V v0 = factory.createValue(key); - v = v0; + if (v0 == null) + throw new IgniteException("Failed to create non-null value. [key=" + key + ']'); - vlueCrtLatch.countDown(); + fut.onDone(v0); + } + catch (Throwable e) { + fut.onDone(e); + } } /** @@ -155,10 +158,8 @@ public class HadoopLazyConcurrentMap<K, V> { * @return the value * @throws IgniteInterruptedCheckedException if interrupted during wait. */ - @Nullable V getValue() throws IgniteInterruptedCheckedException { - U.await(vlueCrtLatch); - - return v; + @Nullable V getValue() throws IgniteCheckedException { + return fut.get(); } } @@ -170,6 +171,7 @@ public class HadoopLazyConcurrentMap<K, V> { public interface ValueFactory <K, V> { /** * Creates the new value. Must never return null. + * * @param key the key to create value for * @return the value. * @throws IgniteException on failure. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2a472e16/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java index 9f5220d..2270caa 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java @@ -241,6 +241,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { try { FileSystem fs = FileSystem.get(jobConf()); + // TODO: Remove //HadoopFileSystemsUtils.setUser(fs, jobConf().getUser()); LocalFileSystem locFs = FileSystem.getLocal(jobConf());
