IGNITE-5846 Add support of distributed matrices for OLS regression. This closes #3030.
Signed-off-by: nikolay_tikhonov <ntikho...@gridgain.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b0a86018 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b0a86018 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b0a86018 Branch: refs/heads/master Commit: b0a86018693581065f789635facb88b1e8dac834 Parents: cbd7e39 Author: YuriBabak <y.ch...@gmail.com> Authored: Fri Nov 17 16:06:34 2017 +0300 Committer: nikolay_tikhonov <ntikho...@gridgain.com> Committed: Fri Nov 17 16:07:51 2017 +0300 ---------------------------------------------------------------------- .../clustering/KMeansDistributedClusterer.java | 13 +- .../ignite/ml/math/distributed/CacheUtils.java | 278 +++--- .../math/distributed/keys/BlockMatrixKey.java | 38 - .../distributed/keys/DataStructureCacheKey.java | 35 + .../math/distributed/keys/MatrixBlockKey.java | 38 + .../math/distributed/keys/MatrixCacheKey.java | 35 - .../math/distributed/keys/RowColMatrixKey.java | 2 +- .../math/distributed/keys/VectorBlockKey.java | 34 + .../distributed/keys/impl/BlockMatrixKey.java | 164 ---- .../distributed/keys/impl/MatrixBlockKey.java | 162 ++++ .../distributed/keys/impl/SparseMatrixKey.java | 14 +- .../distributed/keys/impl/VectorBlockKey.java | 151 +++ .../ignite/ml/math/functions/Functions.java | 3 +- .../ml/math/impls/matrix/AbstractMatrix.java | 24 +- .../ignite/ml/math/impls/matrix/BlockEntry.java | 50 - .../ml/math/impls/matrix/MatrixBlockEntry.java | 50 + .../matrix/SparseBlockDistributedMatrix.java | 153 ++- .../impls/matrix/SparseDistributedMatrix.java | 102 +- .../storage/matrix/BlockMatrixStorage.java | 96 +- .../storage/matrix/BlockVectorStorage.java | 374 ++++++++ .../impls/storage/matrix/MapWrapperStorage.java | 5 +- .../matrix/SparseDistributedMatrixStorage.java | 21 +- .../vector/SparseDistributedVectorStorage.java | 280 ++++++ .../vector/SparseBlockDistributedVector.java | 139 +++ .../impls/vector/SparseDistributedVector.java | 157 ++++ .../ml/math/impls/vector/VectorBlockEntry.java | 49 + .../apache/ignite/ml/math/util/MatrixUtil.java | 4 +- .../ml/math/MathImplDistributedTestSuite.java | 16 +- .../SparseDistributedBlockMatrixTest.java | 86 +- .../matrix/SparseDistributedMatrixTest.java | 27 +- .../SparseDistributedVectorStorageTest.java | 121 +++ .../SparseBlockDistributedVectorTest.java | 181 ++++ .../vector/SparseDistributedVectorTest.java | 192 ++++ ...tedBlockOLSMultipleLinearRegressionTest.java | 926 ++++++++++++++++++ ...tributedOLSMultipleLinearRegressionTest.java | 934 +++++++++++++++++++ .../OLSMultipleLinearRegressionTest.java | 1 + .../ml/regressions/RegressionsTestSuite.java | 2 +- 37 files changed, 4351 insertions(+), 606 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansDistributedClusterer.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansDistributedClusterer.java b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansDistributedClusterer.java index 6c25edc..4286f42 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansDistributedClusterer.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansDistributedClusterer.java @@ -196,12 +196,11 @@ public class KMeansDistributedClusterer extends BaseKMeansClusterer<SparseDistri return list; }, - key -> key.matrixId().equals(uid), + key -> key.dataStructureId().equals(uid), (list1, list2) -> { list1.addAll(list2); return list1; - }, - ArrayList::new + }, ArrayList::new ); } @@ -216,7 +215,7 @@ public class KMeansDistributedClusterer extends BaseKMeansClusterer<SparseDistri return map; }, - key -> key.matrixId().equals(points.getUUID()), + key -> key.dataStructureId().equals(points.getUUID()), (map1, map2) -> { map1.putAll(map2); return map1; @@ -247,10 +246,10 @@ public class KMeansDistributedClusterer extends BaseKMeansClusterer<SparseDistri countMap.compute(resInd, (ind, v) -> v != null ? v + 1 : 1); return countMap; }, - key -> key.matrixId().equals(uid), + key -> key.dataStructureId().equals(uid), (map1, map2) -> MapUtil.mergeMaps(map1, map2, (integer, integer2) -> integer2 + integer, ConcurrentHashMap::new), - ConcurrentHashMap::new); + ConcurrentHashMap::new); } /** */ @@ -278,7 +277,7 @@ public class KMeansDistributedClusterer extends BaseKMeansClusterer<SparseDistri return counts; }, - key -> key.matrixId().equals(uid), + key -> key.dataStructureId().equals(uid), SumsAndCounts::merge, SumsAndCounts::new ); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/CacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/CacheUtils.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/CacheUtils.java index b9eb386..37384b8 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/CacheUtils.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/CacheUtils.java @@ -17,16 +17,6 @@ package org.apache.ignite.ml.math.distributed; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.BinaryOperator; -import java.util.stream.Stream; -import javax.cache.Cache; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.Ignition; @@ -40,18 +30,21 @@ import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteRunnable; -import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.ml.math.KeyMapper; -import org.apache.ignite.ml.math.distributed.keys.BlockMatrixKey; -import org.apache.ignite.ml.math.distributed.keys.MatrixCacheKey; -import org.apache.ignite.ml.math.functions.IgniteBiFunction; -import org.apache.ignite.ml.math.functions.IgniteBinaryOperator; -import org.apache.ignite.ml.math.functions.IgniteConsumer; -import org.apache.ignite.ml.math.functions.IgniteDoubleFunction; -import org.apache.ignite.ml.math.functions.IgniteFunction; -import org.apache.ignite.ml.math.functions.IgniteSupplier; -import org.apache.ignite.ml.math.functions.IgniteTriFunction; -import org.apache.ignite.ml.math.impls.matrix.BlockEntry; +import org.apache.ignite.ml.math.distributed.keys.DataStructureCacheKey; +import org.apache.ignite.ml.math.distributed.keys.RowColMatrixKey; +import org.apache.ignite.ml.math.distributed.keys.impl.MatrixBlockKey; +import org.apache.ignite.ml.math.distributed.keys.impl.VectorBlockKey; +import org.apache.ignite.ml.math.exceptions.UnsupportedOperationException; +import org.apache.ignite.ml.math.functions.*; +import org.apache.ignite.ml.math.impls.matrix.MatrixBlockEntry; +import org.apache.ignite.ml.math.impls.vector.VectorBlockEntry; + +import javax.cache.Cache; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BinaryOperator; +import java.util.stream.Stream; /** * Distribution-related misc. support. @@ -104,11 +97,11 @@ public class CacheUtils { /** * @param cacheName Cache name. - * @param k Key into the cache. - * @param <K> Key type. + * @param k Key into the cache. + * @param <K> Key type. * @return Cluster group for given key. */ - public static <K> ClusterGroup groupForKey(String cacheName, K k) { + protected static <K> ClusterGroup getClusterGroupForGivenKey(String cacheName, K k) { return ignite().cluster().forNode(ignite().affinity(cacheName).mapKeyToNode(k)); } @@ -116,8 +109,8 @@ public class CacheUtils { * @param cacheName Cache name. * @param keyMapper {@link KeyMapper} to validate cache key. * @param valMapper {@link ValueMapper} to obtain double value for given cache key. - * @param <K> Cache key object type. - * @param <V> Cache value object type. + * @param <K> Cache key object type. + * @param <V> Cache value object type. * @return Sum of the values obtained for valid keys. */ public static <K, V> double sum(String cacheName, KeyMapper<K> keyMapper, ValueMapper<V> valMapper) { @@ -126,8 +119,7 @@ public class CacheUtils { double v = valMapper.toDouble(ce.entry().getValue()); return acc == null ? v : acc + v; - } - else + } else return acc; }); @@ -146,19 +138,17 @@ public class CacheUtils { Collection<Double> subSums = fold(cacheName, (CacheEntry<K, V> ce, Double acc) -> { V v = ce.entry().getValue(); - double sum = 0.0; + double sum; if (v instanceof Map) { - Map<Integer, Double> map = (Map<Integer, Double>)v; + Map<Integer, Double> map = (Map<Integer, Double>) v; sum = sum(map.values()); - } - else if (v instanceof BlockEntry) { - BlockEntry be = (BlockEntry)v; + } else if (v instanceof MatrixBlockEntry) { + MatrixBlockEntry be = (MatrixBlockEntry) v; sum = be.sum(); - } - else + } else throw new UnsupportedOperationException(); return acc == null ? sum : acc + sum; @@ -180,8 +170,8 @@ public class CacheUtils { * @param cacheName Cache name. * @param keyMapper {@link KeyMapper} to validate cache key. * @param valMapper {@link ValueMapper} to obtain double value for given cache key. - * @param <K> Cache key object type. - * @param <V> Cache value object type. + * @param <K> Cache key object type. + * @param <V> Cache value object type. * @return Minimum value for valid keys. */ public static <K, V> double min(String cacheName, KeyMapper<K> keyMapper, ValueMapper<V> valMapper) { @@ -193,8 +183,7 @@ public class CacheUtils { return v; else return Math.min(acc, v); - } - else + } else return acc; }); @@ -216,16 +205,14 @@ public class CacheUtils { double min; if (v instanceof Map) { - Map<Integer, Double> map = (Map<Integer, Double>)v; + Map<Integer, Double> map = (Map<Integer, Double>) v; min = Collections.min(map.values()); - } - else if (v instanceof BlockEntry) { - BlockEntry be = (BlockEntry)v; + } else if (v instanceof MatrixBlockEntry) { + MatrixBlockEntry be = (MatrixBlockEntry) v; min = be.minValue(); - } - else + } else throw new UnsupportedOperationException(); if (acc == null) @@ -253,16 +240,14 @@ public class CacheUtils { double max; if (v instanceof Map) { - Map<Integer, Double> map = (Map<Integer, Double>)v; + Map<Integer, Double> map = (Map<Integer, Double>) v; max = Collections.max(map.values()); - } - else if (v instanceof BlockEntry) { - BlockEntry be = (BlockEntry)v; + } else if (v instanceof MatrixBlockEntry) { + MatrixBlockEntry be = (MatrixBlockEntry) v; max = be.maxValue(); - } - else + } else throw new UnsupportedOperationException(); if (acc == null) @@ -279,8 +264,8 @@ public class CacheUtils { * @param cacheName Cache name. * @param keyMapper {@link KeyMapper} to validate cache key. * @param valMapper {@link ValueMapper} to obtain double value for given cache key. - * @param <K> Cache key object type. - * @param <V> Cache value object type. + * @param <K> Cache key object type. + * @param <V> Cache value object type. * @return Maximum value for valid keys. */ public static <K, V> double max(String cacheName, KeyMapper<K> keyMapper, ValueMapper<V> valMapper) { @@ -292,8 +277,7 @@ public class CacheUtils { return v; else return Math.max(acc, v); - } - else + } else return acc; }); @@ -304,12 +288,12 @@ public class CacheUtils { * @param cacheName Cache name. * @param keyMapper {@link KeyMapper} to validate cache key. * @param valMapper {@link ValueMapper} to obtain double value for given cache key. - * @param mapper Mapping {@link IgniteFunction}. - * @param <K> Cache key object type. - * @param <V> Cache value object type. + * @param mapper Mapping {@link IgniteFunction}. + * @param <K> Cache key object type. + * @param <V> Cache value object type. */ public static <K, V> void map(String cacheName, KeyMapper<K> keyMapper, ValueMapper<V> valMapper, - IgniteFunction<Double, Double> mapper) { + IgniteFunction<Double, Double> mapper) { foreach(cacheName, (CacheEntry<K, V> ce) -> { K k = ce.entry().getKey(); @@ -321,7 +305,7 @@ public class CacheUtils { /** * @param matrixUuid Matrix UUID. - * @param mapper Mapping {@link IgniteFunction}. + * @param mapper Mapping {@link IgniteFunction}. */ @SuppressWarnings("unchecked") public static <K, V> void sparseMap(UUID matrixUuid, IgniteDoubleFunction<Double> mapper, String cacheName) { @@ -335,18 +319,16 @@ public class CacheUtils { V v = ce.entry().getValue(); if (v instanceof Map) { - Map<Integer, Double> map = (Map<Integer, Double>)v; + Map<Integer, Double> map = (Map<Integer, Double>) v; for (Map.Entry<Integer, Double> e : (map.entrySet())) e.setValue(mapper.apply(e.getValue())); - } - else if (v instanceof BlockEntry) { - BlockEntry be = (BlockEntry)v; + } else if (v instanceof MatrixBlockEntry) { + MatrixBlockEntry be = (MatrixBlockEntry) v; be.map(mapper); - } - else + } else throw new UnsupportedOperationException(); ce.cache().put(k, v); @@ -360,34 +342,40 @@ public class CacheUtils { */ private static <K> IgnitePredicate<K> sparseKeyFilter(UUID matrixUuid) { return key -> { - if (key instanceof MatrixCacheKey) - return ((MatrixCacheKey)key).matrixId().equals(matrixUuid); + if (key instanceof DataStructureCacheKey) + return ((DataStructureCacheKey) key).dataStructureId().equals(matrixUuid); else if (key instanceof IgniteBiTuple) - return ((IgniteBiTuple<Integer, UUID>)key).get2().equals(matrixUuid); + return ((IgniteBiTuple<Integer, UUID>) key).get2().equals(matrixUuid); + else if (key instanceof MatrixBlockKey) + return ((MatrixBlockKey) key).dataStructureId().equals(matrixUuid); + else if (key instanceof RowColMatrixKey) + return ((RowColMatrixKey) key).dataStructureId().equals(matrixUuid); + else if (key instanceof VectorBlockKey) + return ((VectorBlockKey) key).dataStructureId().equals(matrixUuid); else - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException(); // TODO: handle my poor doubles }; } /** * @param cacheName Cache name. - * @param fun An operation that accepts a cache entry and processes it. - * @param <K> Cache key object type. - * @param <V> Cache value object type. + * @param fun An operation that accepts a cache entry and processes it. + * @param <K> Cache key object type. + * @param <V> Cache value object type. */ - public static <K, V> void foreach(String cacheName, IgniteConsumer<CacheEntry<K, V>> fun) { + private static <K, V> void foreach(String cacheName, IgniteConsumer<CacheEntry<K, V>> fun) { foreach(cacheName, fun, null); } /** * @param cacheName Cache name. - * @param fun An operation that accepts a cache entry and processes it. + * @param fun An operation that accepts a cache entry and processes it. * @param keyFilter Cache keys filter. - * @param <K> Cache key object type. - * @param <V> Cache value object type. + * @param <K> Cache key object type. + * @param <V> Cache value object type. */ - public static <K, V> void foreach(String cacheName, IgniteConsumer<CacheEntry<K, V>> fun, - IgnitePredicate<K> keyFilter) { + protected static <K, V> void foreach(String cacheName, IgniteConsumer<CacheEntry<K, V>> fun, + IgnitePredicate<K> keyFilter) { bcast(cacheName, () -> { Ignite ignite = Ignition.localIgnite(); IgniteCache<K, V> cache = ignite.getOrCreateCache(cacheName); @@ -405,7 +393,7 @@ public class CacheUtils { // Iterate over given partition. // Query returns an empty cursor if this partition is not stored on this node. for (Cache.Entry<K, V> entry : cache.query(new ScanQuery<K, V>(part, - (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k))))) + (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k))))) fun.accept(new CacheEntry<>(entry, cache)); } }); @@ -413,14 +401,14 @@ public class CacheUtils { /** * @param cacheName Cache name. - * @param fun An operation that accepts a cache entry and processes it. - * @param ignite Ignite. - * @param keysGen Keys generator. - * @param <K> Cache key object type. - * @param <V> Cache value object type. + * @param fun An operation that accepts a cache entry and processes it. + * @param ignite Ignite. + * @param keysGen Keys generator. + * @param <K> Cache key object type. + * @param <V> Cache value object type. */ public static <K, V> void update(String cacheName, Ignite ignite, - IgniteBiFunction<Ignite, Cache.Entry<K, V>, Stream<Cache.Entry<K, V>>> fun, IgniteSupplier<Set<K>> keysGen) { + IgniteBiFunction<Ignite, Cache.Entry<K, V>, Stream<Cache.Entry<K, V>>> fun, IgniteSupplier<Set<K>> keysGen) { bcast(cacheName, ignite, () -> { Ignite ig = Ignition.localIgnite(); IgniteCache<K, V> cache = ig.getOrCreateCache(cacheName); @@ -447,14 +435,14 @@ public class CacheUtils { /** * @param cacheName Cache name. - * @param fun An operation that accepts a cache entry and processes it. - * @param ignite Ignite. - * @param keysGen Keys generator. - * @param <K> Cache key object type. - * @param <V> Cache value object type. + * @param fun An operation that accepts a cache entry and processes it. + * @param ignite Ignite. + * @param keysGen Keys generator. + * @param <K> Cache key object type. + * @param <V> Cache value object type. */ public static <K, V> void update(String cacheName, Ignite ignite, IgniteConsumer<Cache.Entry<K, V>> fun, - IgniteSupplier<Set<K>> keysGen) { + IgniteSupplier<Set<K>> keysGen) { bcast(cacheName, ignite, () -> { Ignite ig = Ignition.localIgnite(); IgniteCache<K, V> cache = ig.getOrCreateCache(cacheName); @@ -485,10 +473,10 @@ public class CacheUtils { * <b>Currently fold supports only commutative operations.<b/> * * @param cacheName Cache name. - * @param folder Fold function operating over cache entries. - * @param <K> Cache key object type. - * @param <V> Cache value object type. - * @param <A> Fold result type. + * @param folder Fold function operating over cache entries. + * @param <K> Cache key object type. + * @param <V> Cache value object type. + * @param <A> Fold result type. * @return Fold operation result. */ public static <K, V, A> Collection<A> fold(String cacheName, IgniteBiFunction<CacheEntry<K, V>, A, A> folder) { @@ -499,14 +487,14 @@ public class CacheUtils { * <b>Currently fold supports only commutative operations.<b/> * * @param cacheName Cache name. - * @param folder Fold function operating over cache entries. - * @param <K> Cache key object type. - * @param <V> Cache value object type. - * @param <A> Fold result type. + * @param folder Fold function operating over cache entries. + * @param <K> Cache key object type. + * @param <V> Cache value object type. + * @param <A> Fold result type. * @return Fold operation result. */ public static <K, V, A> Collection<A> fold(String cacheName, IgniteBiFunction<CacheEntry<K, V>, A, A> folder, - IgnitePredicate<K> keyFilter) { + IgnitePredicate<K> keyFilter) { return bcast(cacheName, () -> { Ignite ignite = Ignition.localIgnite(); IgniteCache<K, V> cache = ignite.getOrCreateCache(cacheName); @@ -526,7 +514,7 @@ public class CacheUtils { // Iterate over given partition. // Query returns an empty cursor if this partition is not stored on this node. for (Cache.Entry<K, V> entry : cache.query(new ScanQuery<K, V>(part, - (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k))))) + (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k))))) a = folder.apply(new CacheEntry<>(entry, cache), a); } @@ -537,34 +525,34 @@ public class CacheUtils { /** * Distributed version of fold operation. * - * @param cacheName Cache name. - * @param folder Folder. - * @param keyFilter Key filter. + * @param cacheName Cache name. + * @param folder Folder. + * @param keyFilter Key filter. * @param accumulator Accumulator. * @param zeroValSupp Zero value supplier. */ public static <K, V, A> A distributedFold(String cacheName, IgniteBiFunction<Cache.Entry<K, V>, A, A> folder, - IgnitePredicate<K> keyFilter, BinaryOperator<A> accumulator, IgniteSupplier<A> zeroValSupp) { + IgnitePredicate<K> keyFilter, BinaryOperator<A> accumulator, IgniteSupplier<A> zeroValSupp) { return sparseFold(cacheName, folder, keyFilter, accumulator, zeroValSupp, null, null, 0, - false); + false); } /** * Sparse version of fold. This method also applicable to sparse zeroes. * - * @param cacheName Cache name. - * @param folder Folder. - * @param keyFilter Key filter. + * @param cacheName Cache name. + * @param folder Folder. + * @param keyFilter Key filter. * @param accumulator Accumulator. * @param zeroValSupp Zero value supplier. - * @param defVal Default value. - * @param defKey Default key. - * @param defValCnt Def value count. + * @param defVal Default value. + * @param defKey Default key. + * @param defValCnt Def value count. * @param isNilpotent Is nilpotent. */ private static <K, V, A> A sparseFold(String cacheName, IgniteBiFunction<Cache.Entry<K, V>, A, A> folder, - IgnitePredicate<K> keyFilter, BinaryOperator<A> accumulator, IgniteSupplier<A> zeroValSupp, V defVal, K defKey, - long defValCnt, boolean isNilpotent) { + IgnitePredicate<K> keyFilter, BinaryOperator<A> accumulator, IgniteSupplier<A> zeroValSupp, V defVal, K defKey, + long defValCnt, boolean isNilpotent) { A defRes = zeroValSupp.get(); @@ -591,7 +579,7 @@ public class CacheUtils { // Iterate over given partition. // Query returns an empty cursor if this partition is not stored on this node. for (Cache.Entry<K, V> entry : cache.query(new ScanQuery<K, V>(part, - (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k))))) + (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k))))) a = folder.apply(entry, a); } @@ -601,10 +589,10 @@ public class CacheUtils { } public static <K, V, A, W> A reduce(String cacheName, Ignite ignite, - IgniteTriFunction<W, Cache.Entry<K, V>, A, A> acc, - IgniteSupplier<W> supp, - IgniteSupplier<Iterable<Cache.Entry<K, V>>> entriesGen, IgniteBinaryOperator<A> comb, - IgniteSupplier<A> zeroValSupp) { + IgniteTriFunction<W, Cache.Entry<K, V>, A, A> acc, + IgniteSupplier<W> supp, + IgniteSupplier<Iterable<Cache.Entry<K, V>>> entriesGen, IgniteBinaryOperator<A> comb, + IgniteSupplier<A> zeroValSupp) { A defRes = zeroValSupp.get(); @@ -624,15 +612,15 @@ public class CacheUtils { } public static <K, V, A, W> A reduce(String cacheName, IgniteTriFunction<W, Cache.Entry<K, V>, A, A> acc, - IgniteSupplier<W> supp, - IgniteSupplier<Iterable<Cache.Entry<K, V>>> entriesGen, IgniteBinaryOperator<A> comb, - IgniteSupplier<A> zeroValSupp) { + IgniteSupplier<W> supp, + IgniteSupplier<Iterable<Cache.Entry<K, V>>> entriesGen, IgniteBinaryOperator<A> comb, + IgniteSupplier<A> zeroValSupp) { return reduce(cacheName, Ignition.localIgnite(), acc, supp, entriesGen, comb, zeroValSupp); } /** * @param cacheName Cache name. - * @param run {@link Runnable} to broadcast to cache nodes for given cache name. + * @param run {@link Runnable} to broadcast to cache nodes for given cache name. */ public static void bcast(String cacheName, Ignite ignite, IgniteRunnable run) { ignite.compute(ignite.cluster().forDataNodes(cacheName)).broadcast(run); @@ -640,8 +628,9 @@ public class CacheUtils { /** * Broadcast runnable to data nodes of given cache. + * * @param cacheName Cache name. - * @param run Runnable. + * @param run Runnable. */ public static void bcast(String cacheName, IgniteRunnable run) { bcast(cacheName, ignite(), run); @@ -649,8 +638,8 @@ public class CacheUtils { /** * @param cacheName Cache name. - * @param call {@link IgniteCallable} to broadcast to cache nodes for given cache name. - * @param <A> Type returned by the callable. + * @param call {@link IgniteCallable} to broadcast to cache nodes for given cache name. + * @param <A> Type returned by the callable. */ public static <A> Collection<A> bcast(String cacheName, IgniteCallable<A> call) { return bcast(cacheName, ignite(), call); @@ -658,13 +647,42 @@ public class CacheUtils { /** * Broadcast callable to data nodes of given cache. + * * @param cacheName Cache name. - * @param ignite Ignite instance. - * @param call Callable to broadcast. - * @param <A> Type of callable result. + * @param ignite Ignite instance. + * @param call Callable to broadcast. + * @param <A> Type of callable result. * @return Results of callable from each node. */ public static <A> Collection<A> bcast(String cacheName, Ignite ignite, IgniteCallable<A> call) { return ignite.compute(ignite.cluster().forDataNodes(cacheName)).broadcast(call); } + + /** + * @param vectorUuid Matrix UUID. + * @param mapper Mapping {@link IgniteFunction}. + */ + @SuppressWarnings("unchecked") + public static <K, V> void sparseMapForVector(UUID vectorUuid, IgniteDoubleFunction<V> mapper, String cacheName) { + A.notNull(vectorUuid, "vectorUuid"); + A.notNull(cacheName, "cacheName"); + A.notNull(mapper, "mapper"); + + foreach(cacheName, (CacheEntry<K, V> ce) -> { + K k = ce.entry().getKey(); + + V v = ce.entry().getValue(); + + if (v instanceof VectorBlockEntry) { + VectorBlockEntry entry = (VectorBlockEntry) v; + + for (int i = 0; i < entry.size(); i++) entry.set(i, (Double) mapper.apply(entry.get(i))); + + ce.cache().put(k, (V) entry); + } else { + V mappingRes = mapper.apply((Double) v); + ce.cache().put(k, mappingRes); + } + }, sparseKeyFilter(vectorUuid)); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/BlockMatrixKey.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/BlockMatrixKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/BlockMatrixKey.java deleted file mode 100644 index 091b325..0000000 --- a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/BlockMatrixKey.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.ml.math.distributed.keys; - -import org.apache.ignite.internal.util.lang.IgnitePair; -import org.apache.ignite.ml.math.impls.matrix.SparseBlockDistributedMatrix; - -/** - * Cache key for blocks in {@link SparseBlockDistributedMatrix}. - * - * TODO: check if using {@link IgnitePair} will be better for block id. - */ -public interface BlockMatrixKey extends MatrixCacheKey { - /** - * @return block row id. - */ - public long blockRowId(); - - /** - * @return block col id. - */ - public long blockColId(); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/DataStructureCacheKey.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/DataStructureCacheKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/DataStructureCacheKey.java new file mode 100644 index 0000000..d99ea48 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/DataStructureCacheKey.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.math.distributed.keys; + +import java.util.UUID; + +/** + * Base matrix cache key. + */ +public interface DataStructureCacheKey { + /** + * @return matrix id. + */ + public UUID dataStructureId(); + + /** + * @return affinity key. + */ + public Object affinityKey(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/MatrixBlockKey.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/MatrixBlockKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/MatrixBlockKey.java new file mode 100644 index 0000000..9c76568 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/MatrixBlockKey.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.math.distributed.keys; + +import org.apache.ignite.internal.util.lang.IgnitePair; +import org.apache.ignite.ml.math.impls.matrix.SparseBlockDistributedMatrix; + +/** + * Cache key for blocks in {@link SparseBlockDistributedMatrix}. + * + * TODO: check if using {@link IgnitePair} will be better for block id. + */ +public interface MatrixBlockKey extends DataStructureCacheKey { + /** + * @return block row id. + */ + public long blockRowId(); + + /** + * @return block col id. + */ + public long blockColId(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/MatrixCacheKey.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/MatrixCacheKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/MatrixCacheKey.java deleted file mode 100644 index 0242560..0000000 --- a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/MatrixCacheKey.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.ml.math.distributed.keys; - -import java.util.UUID; - -/** - * Base matrix cache key. - */ -public interface MatrixCacheKey { - /** - * @return matrix id. - */ - public UUID matrixId(); - - /** - * @return affinity key. - */ - public Object affinityKey(); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/RowColMatrixKey.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/RowColMatrixKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/RowColMatrixKey.java index 168f49f..78af2e8 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/RowColMatrixKey.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/RowColMatrixKey.java @@ -22,7 +22,7 @@ import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrix; /** * Cache key for {@link SparseDistributedMatrix}. */ -public interface RowColMatrixKey extends MatrixCacheKey { +public interface RowColMatrixKey extends DataStructureCacheKey { /** * Return index value(blockId, Row/Col index, etc.) */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/VectorBlockKey.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/VectorBlockKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/VectorBlockKey.java new file mode 100644 index 0000000..32af965 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/VectorBlockKey.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.math.distributed.keys; + +import org.apache.ignite.internal.util.lang.IgnitePair; +import org.apache.ignite.ml.math.impls.vector.SparseBlockDistributedVector; + +/** + * Cache key for blocks in {@link SparseBlockDistributedVector}. + * + * TODO: check if using {@link IgnitePair} will be better for block id. + */ +public interface VectorBlockKey extends DataStructureCacheKey { + /** + * @return block id. + */ + public long blockId(); + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/BlockMatrixKey.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/BlockMatrixKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/BlockMatrixKey.java deleted file mode 100644 index cc8c488..0000000 --- a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/BlockMatrixKey.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.ml.math.distributed.keys.impl; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.UUID; -import org.apache.ignite.binary.BinaryObjectException; -import org.apache.ignite.binary.BinaryRawReader; -import org.apache.ignite.binary.BinaryRawWriter; -import org.apache.ignite.binary.BinaryReader; -import org.apache.ignite.binary.BinaryWriter; -import org.apache.ignite.binary.Binarylizable; -import org.apache.ignite.internal.binary.BinaryUtils; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.ml.math.impls.matrix.BlockEntry; -import org.apache.ignite.ml.math.impls.matrix.SparseBlockDistributedMatrix; -import org.jetbrains.annotations.Nullable; - -/** - * Key implementation for {@link BlockEntry} using for {@link SparseBlockDistributedMatrix}. - */ -public class BlockMatrixKey implements org.apache.ignite.ml.math.distributed.keys.BlockMatrixKey, Externalizable, Binarylizable { - /** */ - private static final long serialVersionUID = 0L; - /** Block row ID */ - private long blockIdRow; - /** Block col ID */ - private long blockIdCol; - /** Matrix ID */ - private UUID matrixUuid; - /** Block affinity key. */ - private IgniteUuid affinityKey; - - /** - * Empty constructor required for {@link Externalizable}. - */ - public BlockMatrixKey() { - // No-op. - } - - /** - * Construct matrix block key. - * - * @param matrixUuid Matrix uuid. - * @param affinityKey Affinity key. - */ - public BlockMatrixKey(long rowId, long colId, UUID matrixUuid, @Nullable IgniteUuid affinityKey) { - assert rowId >= 0; - assert colId >= 0; - assert matrixUuid != null; - - this.blockIdRow = rowId; - this.blockIdCol = colId; - this.matrixUuid = matrixUuid; - this.affinityKey = affinityKey; - } - - /** {@inheritDoc} */ - @Override public long blockRowId() { - return blockIdRow; - } - - /** {@inheritDoc} */ - @Override public long blockColId() { - return blockIdCol; - } - - /** {@inheritDoc} */ - @Override public UUID matrixId() { - return matrixUuid; - } - - /** {@inheritDoc} */ - @Override public IgniteUuid affinityKey() { - return affinityKey; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(matrixUuid); - U.writeGridUuid(out, affinityKey); - out.writeLong(blockIdRow); - out.writeLong(blockIdCol); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - matrixUuid = (UUID)in.readObject(); - affinityKey = U.readGridUuid(in); - blockIdRow = in.readLong(); - blockIdCol = in.readLong(); - } - - /** {@inheritDoc} */ - @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException { - BinaryRawWriter out = writer.rawWriter(); - - out.writeUuid(matrixUuid); - BinaryUtils.writeIgniteUuid(out, affinityKey); - out.writeLong(blockIdRow); - out.writeLong(blockIdCol); - } - - /** {@inheritDoc} */ - @Override public void readBinary(BinaryReader reader) throws BinaryObjectException { - BinaryRawReader in = reader.rawReader(); - - matrixUuid = in.readUuid(); - affinityKey = BinaryUtils.readIgniteUuid(in); - blockIdRow = in.readLong(); - blockIdCol = in.readLong(); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = 37; - - res += res * 37 + blockIdCol; - res += res * 37 + blockIdRow; - res += res * 37 + matrixUuid.hashCode(); - - return res; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - if (obj == this) - return true; - - if (obj == null || obj.getClass() != getClass()) - return false; - - BlockMatrixKey that = (BlockMatrixKey)obj; - - return blockIdRow == that.blockIdRow && blockIdCol == that.blockIdCol && matrixUuid.equals(that.matrixUuid) - && F.eq(affinityKey, that.affinityKey); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(BlockMatrixKey.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/MatrixBlockKey.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/MatrixBlockKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/MatrixBlockKey.java new file mode 100644 index 0000000..9e8d81e --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/MatrixBlockKey.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.math.distributed.keys.impl; + +import org.apache.ignite.binary.*; +import org.apache.ignite.internal.binary.BinaryUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.ml.math.impls.matrix.MatrixBlockEntry; +import org.apache.ignite.ml.math.impls.matrix.SparseBlockDistributedMatrix; +import org.jetbrains.annotations.Nullable; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.UUID; + +/** + * Key implementation for {@link MatrixBlockEntry} using for {@link SparseBlockDistributedMatrix}. + */ +public class MatrixBlockKey implements org.apache.ignite.ml.math.distributed.keys.MatrixBlockKey, Externalizable, Binarylizable { + /** */ + private static final long serialVersionUID = 0L; + /** Block row ID */ + private long blockIdRow; + /** Block col ID */ + private long blockIdCol; + /** Matrix ID */ + private UUID matrixUuid; + /** Block affinity key. */ + private UUID affinityKey; + + /** + * Empty constructor required for {@link Externalizable}. + */ + public MatrixBlockKey() { + // No-op. + } + + /** + * Construct matrix block key. + * + * @param matrixUuid Matrix uuid. + * @param affinityKey Affinity key. + */ + public MatrixBlockKey(long rowId, long colId, UUID matrixUuid, @Nullable UUID affinityKey) { + assert rowId >= 0; + assert colId >= 0; + assert matrixUuid != null; + + this.blockIdRow = rowId; + this.blockIdCol = colId; + this.matrixUuid = matrixUuid; + this.affinityKey = affinityKey; + } + + /** {@inheritDoc} */ + @Override public long blockRowId() { + return blockIdRow; + } + + /** {@inheritDoc} */ + @Override public long blockColId() { + return blockIdCol; + } + + /** {@inheritDoc} */ + @Override public UUID dataStructureId() { + return matrixUuid; + } + + /** {@inheritDoc} */ + @Override public UUID affinityKey() { + return affinityKey; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(matrixUuid); + out.writeObject(affinityKey); + out.writeLong(blockIdRow); + out.writeLong(blockIdCol); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + matrixUuid = (UUID)in.readObject(); + affinityKey = (UUID)in.readObject(); + blockIdRow = in.readLong(); + blockIdCol = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException { + BinaryRawWriter out = writer.rawWriter(); + + out.writeUuid(matrixUuid); + out.writeUuid(affinityKey); + out.writeLong(blockIdRow); + out.writeLong(blockIdCol); + } + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReader reader) throws BinaryObjectException { + BinaryRawReader in = reader.rawReader(); + + matrixUuid = in.readUuid(); + affinityKey = in.readUuid(); + blockIdRow = in.readLong(); + blockIdCol = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = 37; + + res += res * 37 + blockIdCol; + res += res * 37 + blockIdRow; + res += res * 37 + matrixUuid.hashCode(); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (obj == this) + return true; + + if (obj == null || obj.getClass() != getClass()) + return false; + + MatrixBlockKey that = (MatrixBlockKey)obj; + + return blockIdRow == that.blockIdRow && blockIdCol == that.blockIdCol && matrixUuid.equals(that.matrixUuid) + && F.eq(affinityKey, that.affinityKey); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MatrixBlockKey.class, this); + } + + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/SparseMatrixKey.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/SparseMatrixKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/SparseMatrixKey.java index aa5e0ad..980d433 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/SparseMatrixKey.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/SparseMatrixKey.java @@ -17,17 +17,18 @@ package org.apache.ignite.ml.math.distributed.keys.impl; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.UUID; import org.apache.ignite.cache.affinity.AffinityKeyMapped; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.ml.math.distributed.keys.RowColMatrixKey; import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrix; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.UUID; + /** * Key implementation for {@link SparseDistributedMatrix}. */ @@ -65,7 +66,7 @@ public class SparseMatrixKey implements RowColMatrixKey, Externalizable { } /** {@inheritDoc} */ - @Override public UUID matrixId() { + @Override public UUID dataStructureId() { return matrixId; } @@ -76,7 +77,6 @@ public class SparseMatrixKey implements RowColMatrixKey, Externalizable { /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { -// U.writeGridUuid(out, matrixId); out.writeObject(matrixId); out.writeObject(affinityKey); out.writeInt(idx); http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/VectorBlockKey.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/VectorBlockKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/VectorBlockKey.java new file mode 100644 index 0000000..6052010 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/VectorBlockKey.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.math.distributed.keys.impl; + +import org.apache.ignite.binary.*; +import org.apache.ignite.internal.binary.BinaryUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.ml.math.impls.vector.SparseBlockDistributedVector; +import org.apache.ignite.ml.math.impls.vector.VectorBlockEntry; +import org.jetbrains.annotations.Nullable; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.UUID; + +/** + * Key implementation for {@link VectorBlockEntry} using for {@link SparseBlockDistributedVector}. + */ +public class VectorBlockKey implements org.apache.ignite.ml.math.distributed.keys.VectorBlockKey, Externalizable, Binarylizable { + /** */ + private static final long serialVersionUID = 0L; + /** Block row ID */ + private long blockId; + /** Vector ID */ + private UUID vectorUuid; + /** Block affinity key. */ + private UUID affinityKey; + + /** + * Empty constructor required for {@link Externalizable}. + */ + public VectorBlockKey() { + // No-op. + } + + /** + * Construct vector block key. + * + * @param vectorUuid Vector uuid. + * @param affinityKey Affinity key. + */ + public VectorBlockKey(long blockId, UUID vectorUuid, @Nullable UUID affinityKey) { + assert blockId >= 0; + assert vectorUuid != null; + + this.blockId = blockId; + this.vectorUuid = vectorUuid; + this.affinityKey = affinityKey; + } + + /** {@inheritDoc} */ + @Override public long blockId() { + return blockId; + } + + + /** {@inheritDoc} */ + @Override public UUID dataStructureId() { + return vectorUuid; + } + + /** {@inheritDoc} */ + @Override public UUID affinityKey() { + return affinityKey; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(vectorUuid); + out.writeObject(affinityKey); + out.writeLong(blockId); + + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + vectorUuid = (UUID)in.readObject(); + affinityKey = (UUID)in.readObject(); + blockId = in.readLong(); + + } + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException { + BinaryRawWriter out = writer.rawWriter(); + + out.writeUuid(vectorUuid); + out.writeUuid(affinityKey); + out.writeLong(blockId); + } + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReader reader) throws BinaryObjectException { + BinaryRawReader in = reader.rawReader(); + + vectorUuid = in.readUuid(); + affinityKey = in.readUuid(); + blockId = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = 37; + + res += res * 37 + blockId; + res += res * 37 + vectorUuid.hashCode(); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (obj == this) + return true; + + if (obj == null || obj.getClass() != getClass()) + return false; + + VectorBlockKey that = (VectorBlockKey)obj; + + return blockId == that.blockId && vectorUuid.equals(that.vectorUuid) + && F.eq(affinityKey, that.affinityKey); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(VectorBlockKey.class, this); + } + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/functions/Functions.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/functions/Functions.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/functions/Functions.java index 0b4ad12..ce534bd 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/math/functions/Functions.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/functions/Functions.java @@ -17,10 +17,11 @@ package org.apache.ignite.ml.math.functions; +import org.apache.ignite.lang.IgniteBiTuple; + import java.util.Comparator; import java.util.List; import java.util.function.BiFunction; -import org.apache.ignite.lang.IgniteBiTuple; /** * Compatibility with Apache Mahout. http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java index 06fb34c..89f567e 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java @@ -43,6 +43,7 @@ import org.apache.ignite.ml.math.functions.IgniteTriFunction; import org.apache.ignite.ml.math.functions.IntIntToDoubleFunction; import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector; import org.apache.ignite.ml.math.impls.vector.MatrixVectorView; +import org.apache.ignite.ml.math.util.MatrixUtil; /** * This class provides a helper implementation of the {@link Matrix} @@ -282,7 +283,7 @@ public abstract class AbstractMatrix implements Matrix { * * @param row Row index. */ - private void checkRowIndex(int row) { + void checkRowIndex(int row) { if (row < 0 || row >= rowSize()) throw new RowIndexException(row); } @@ -292,7 +293,7 @@ public abstract class AbstractMatrix implements Matrix { * * @param col Column index. */ - private void checkColumnIndex(int col) { + void checkColumnIndex(int col) { if (col < 0 || col >= columnSize()) throw new ColumnIndexException(col); } @@ -303,7 +304,7 @@ public abstract class AbstractMatrix implements Matrix { * @param row Row index. * @param col Column index. */ - protected void checkIndex(int row, int col) { + private void checkIndex(int row, int col) { checkRowIndex(row); checkColumnIndex(col); } @@ -739,11 +740,12 @@ public abstract class AbstractMatrix implements Matrix { /** {@inheritDoc} */ @Override public Vector getCol(int col) { checkColumnIndex(col); - - Vector res = new DenseLocalOnHeapVector(rowSize()); + Vector res; + if (isDistributed()) res = MatrixUtil.likeVector(this, rowSize()); + else res = new DenseLocalOnHeapVector(rowSize()); for (int i = 0; i < rowSize(); i++) - res.setX(i, getX(i,col)); + res.setX(i, getX(i, col)); return res; } @@ -974,4 +976,14 @@ public abstract class AbstractMatrix implements Matrix { @Override public void compute(int row, int col, IgniteTriFunction<Integer, Integer, Double, Double> f) { setX(row, col, f.apply(row, col, getX(row, col))); } + + + protected int getMaxAmountOfColumns(double[][] data) { + int maxAmountOfColumns = 0; + + for (int i = 0; i < data.length; i++) + maxAmountOfColumns = Math.max(maxAmountOfColumns, data[i].length); + + return maxAmountOfColumns; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/BlockEntry.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/BlockEntry.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/BlockEntry.java deleted file mode 100644 index 47f07ce..0000000 --- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/BlockEntry.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.ml.math.impls.matrix; - -import org.apache.ignite.ml.math.Matrix; - -/** - * Block for {@link SparseBlockDistributedMatrix}. - */ -public final class BlockEntry extends SparseLocalOnHeapMatrix { - /** Max block size. */ - public static final int MAX_BLOCK_SIZE = 32; - - /** */ - public BlockEntry() { - // No-op. - } - - /** */ - public BlockEntry(int row, int col) { - super(row, col); - - assert col <= MAX_BLOCK_SIZE; - assert row <= MAX_BLOCK_SIZE; - } - - /** */ - public BlockEntry(Matrix mtx) { - assert mtx.columnSize() <= MAX_BLOCK_SIZE; - assert mtx.rowSize() <= MAX_BLOCK_SIZE; - - setStorage(mtx.getStorage()); - } - -} http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/MatrixBlockEntry.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/MatrixBlockEntry.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/MatrixBlockEntry.java new file mode 100644 index 0000000..a2d13a1 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/MatrixBlockEntry.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.math.impls.matrix; + +import org.apache.ignite.ml.math.Matrix; + +/** + * Block for {@link SparseBlockDistributedMatrix}. + */ +public final class MatrixBlockEntry extends SparseLocalOnHeapMatrix { + /** Max block size. */ + public static final int MAX_BLOCK_SIZE = 32; + + /** */ + public MatrixBlockEntry() { + // No-op. + } + + /** */ + public MatrixBlockEntry(int row, int col) { + super(row, col); + + assert col <= MAX_BLOCK_SIZE; + assert row <= MAX_BLOCK_SIZE; + } + + /** */ + public MatrixBlockEntry(Matrix mtx) { + assert mtx.columnSize() <= MAX_BLOCK_SIZE; + assert mtx.rowSize() <= MAX_BLOCK_SIZE; + + setStorage(mtx.getStorage()); + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseBlockDistributedMatrix.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseBlockDistributedMatrix.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseBlockDistributedMatrix.java index e829168..ea9fb8c 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseBlockDistributedMatrix.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseBlockDistributedMatrix.java @@ -17,10 +17,6 @@ package org.apache.ignite.ml.math.impls.matrix; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.UUID; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.Ignition; @@ -31,16 +27,25 @@ import org.apache.ignite.ml.math.Matrix; import org.apache.ignite.ml.math.StorageConstants; import org.apache.ignite.ml.math.Vector; import org.apache.ignite.ml.math.distributed.CacheUtils; -import org.apache.ignite.ml.math.distributed.keys.impl.BlockMatrixKey; +import org.apache.ignite.ml.math.distributed.keys.impl.MatrixBlockKey; +import org.apache.ignite.ml.math.distributed.keys.impl.VectorBlockKey; import org.apache.ignite.ml.math.exceptions.CardinalityException; -import org.apache.ignite.ml.math.exceptions.UnsupportedOperationException; import org.apache.ignite.ml.math.functions.IgniteDoubleFunction; import org.apache.ignite.ml.math.impls.storage.matrix.BlockMatrixStorage; +import org.apache.ignite.ml.math.impls.storage.matrix.BlockVectorStorage; +import org.apache.ignite.ml.math.impls.vector.SparseBlockDistributedVector; +import org.apache.ignite.ml.math.impls.vector.SparseDistributedVector; +import org.apache.ignite.ml.math.impls.vector.VectorBlockEntry; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.UUID; /** - * Sparse block distributed matrix. This matrix represented by blocks 32x32 {@link BlockEntry}. + * Sparse block distributed matrix. This matrix represented by blocks 32x32 {@link MatrixBlockEntry}. * - * Using separate cache with keys {@link BlockMatrixKey} and values {@link BlockEntry}. + * Using separate cache with keys {@link MatrixBlockKey} and values {@link MatrixBlockEntry}. */ public class SparseBlockDistributedMatrix extends AbstractMatrix implements StorageConstants { /** @@ -62,6 +67,18 @@ public class SparseBlockDistributedMatrix extends AbstractMatrix implements Stor } /** + * @param data Data to fill the matrix + */ + public SparseBlockDistributedMatrix(double[][] data) { + assert data.length > 0; + setStorage(new BlockMatrixStorage(data.length, getMaxAmountOfColumns(data))); + + for (int i = 0; i < data.length; i++) + for (int j = 0; j < data[i].length; j++) + storage().set(i, j, data[i][j]); + } + + /** * Return the same matrix with updates values (broken contract). * * @param d Value to divide to. @@ -100,22 +117,22 @@ public class SparseBlockDistributedMatrix extends AbstractMatrix implements Stor throw new CardinalityException(columnSize(), mtx.rowSize()); SparseBlockDistributedMatrix matrixA = this; - SparseBlockDistributedMatrix matrixB = (SparseBlockDistributedMatrix)mtx; + SparseBlockDistributedMatrix matrixB = (SparseBlockDistributedMatrix) mtx; String cacheName = this.storage().cacheName(); SparseBlockDistributedMatrix matrixC = new SparseBlockDistributedMatrix(matrixA.rowSize(), matrixB.columnSize()); CacheUtils.bcast(cacheName, () -> { Ignite ignite = Ignition.localIgnite(); - Affinity<BlockMatrixKey> affinity = ignite.affinity(cacheName); + Affinity<MatrixBlockKey> affinity = ignite.affinity(cacheName); - IgniteCache<BlockMatrixKey, BlockEntry> cache = ignite.getOrCreateCache(cacheName); + IgniteCache<MatrixBlockKey, MatrixBlockEntry> cache = ignite.getOrCreateCache(cacheName); ClusterNode locNode = ignite.cluster().localNode(); BlockMatrixStorage storageC = matrixC.storage(); - Map<ClusterNode, Collection<BlockMatrixKey>> keysCToNodes = affinity.mapKeysToNodes(storageC.getAllKeys()); - Collection<BlockMatrixKey> locKeys = keysCToNodes.get(locNode); + Map<ClusterNode, Collection<MatrixBlockKey>> keysCToNodes = affinity.mapKeysToNodes(storageC.getAllKeys()); + Collection<MatrixBlockKey> locKeys = keysCToNodes.get(locNode); if (locKeys == null) return; @@ -128,18 +145,18 @@ public class SparseBlockDistributedMatrix extends AbstractMatrix implements Stor IgnitePair<Long> newBlockId = new IgnitePair<>(newBlockIdRow, newBlockIdCol); - BlockEntry blockC = null; + MatrixBlockEntry blockC = null; - List<BlockEntry> aRow = matrixA.storage().getRowForBlock(newBlockId); - List<BlockEntry> bCol = matrixB.storage().getColForBlock(newBlockId); + List<MatrixBlockEntry> aRow = matrixA.storage().getRowForBlock(newBlockId); + List<MatrixBlockEntry> bCol = matrixB.storage().getColForBlock(newBlockId); for (int i = 0; i < aRow.size(); i++) { - BlockEntry blockA = aRow.get(i); - BlockEntry blockB = bCol.get(i); + MatrixBlockEntry blockA = aRow.get(i); + MatrixBlockEntry blockB = bCol.get(i); - BlockEntry tmpBlock = new BlockEntry(blockA.times(blockB)); + MatrixBlockEntry tmpBlock = new MatrixBlockEntry(blockA.times(blockB)); - blockC = blockC == null ? tmpBlock : new BlockEntry(blockC.plus(tmpBlock)); + blockC = blockC == null ? tmpBlock : new MatrixBlockEntry(blockC.plus(tmpBlock)); } cache.put(storageC.getCacheKey(newBlockIdRow, newBlockIdCol), blockC); @@ -149,6 +166,90 @@ public class SparseBlockDistributedMatrix extends AbstractMatrix implements Stor return matrixC; } + + /** + * {@inheritDoc} + */ + @SuppressWarnings({"unchecked"}) + @Override public Vector times(final Vector vec) { + if (vec == null) + throw new IllegalArgumentException("The vector should be not null."); + + if (columnSize() != vec.size()) + throw new CardinalityException(columnSize(), vec.size()); + + SparseBlockDistributedMatrix matrixA = this; + SparseBlockDistributedVector vectorB = (SparseBlockDistributedVector) vec; + + + String cacheName = this.storage().cacheName(); + SparseBlockDistributedVector vectorC = new SparseBlockDistributedVector(matrixA.rowSize()); + + CacheUtils.bcast(cacheName, () -> { + Ignite ignite = Ignition.localIgnite(); + Affinity<VectorBlockKey> affinity = ignite.affinity(cacheName); + + IgniteCache<VectorBlockKey, VectorBlockEntry> cache = ignite.getOrCreateCache(cacheName); + ClusterNode locNode = ignite.cluster().localNode(); + + BlockVectorStorage storageC = vectorC.storage(); + + Map<ClusterNode, Collection<VectorBlockKey>> keysCToNodes = affinity.mapKeysToNodes(storageC.getAllKeys()); + Collection<VectorBlockKey> locKeys = keysCToNodes.get(locNode); + + if (locKeys == null) + return; + + // compute Cij locally on each node + // TODO: IGNITE:5114, exec in parallel + locKeys.forEach(key -> { + long newBlockId = key.blockId(); + + + IgnitePair<Long> newBlockIdForMtx = new IgnitePair<>(newBlockId, 0L); + + VectorBlockEntry blockC = null; + + List<MatrixBlockEntry> aRow = matrixA.storage().getRowForBlock(newBlockIdForMtx); + List<VectorBlockEntry> bCol = vectorB.storage().getColForBlock(newBlockId); + + for (int i = 0; i < aRow.size(); i++) { + MatrixBlockEntry blockA = aRow.get(i); + VectorBlockEntry blockB = bCol.get(i); + + VectorBlockEntry tmpBlock = new VectorBlockEntry(blockA.times(blockB)); + + blockC = blockC == null ? tmpBlock : new VectorBlockEntry(blockC.plus(tmpBlock)); + } + + cache.put(storageC.getCacheKey(newBlockId), blockC); + }); + }); + return vectorC; + } + + /** {@inheritDoc} */ + @Override public Vector getCol(int col) { + checkColumnIndex(col); + + Vector res = new SparseDistributedVector(rowSize()); + + for (int i = 0; i < rowSize(); i++) + res.setX(i, getX(i, col)); + return res; + } + + /** {@inheritDoc} */ + @Override public Vector getRow(int row) { + checkRowIndex(row); + + Vector res = new SparseDistributedVector(columnSize()); + + for (int i = 0; i < columnSize(); i++) + res.setX(i, getX(row, i)); + return res; + } + /** {@inheritDoc} */ @Override public Matrix assign(double val) { return mapOverValues(v -> val); @@ -176,7 +277,11 @@ public class SparseBlockDistributedMatrix extends AbstractMatrix implements Stor /** {@inheritDoc} */ @Override public Matrix copy() { - throw new UnsupportedOperationException(); + Matrix cp = like(rowSize(), columnSize()); + + cp.assign(this); + + return cp; } /** {@inheritDoc} */ @@ -186,12 +291,12 @@ public class SparseBlockDistributedMatrix extends AbstractMatrix implements Stor /** {@inheritDoc} */ @Override public Vector likeVector(int crd) { - throw new UnsupportedOperationException(); + return new SparseBlockDistributedVector(crd); } /** */ private UUID getUUID() { - return ((BlockMatrixStorage)getStorage()).getUUID(); + return ((BlockMatrixStorage) getStorage()).getUUID(); } /** @@ -208,6 +313,6 @@ public class SparseBlockDistributedMatrix extends AbstractMatrix implements Stor * */ private BlockMatrixStorage storage() { - return (BlockMatrixStorage)getStorage(); + return (BlockMatrixStorage) getStorage(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b0a86018/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java index 594aebc..497241d 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java @@ -17,24 +17,24 @@ package org.apache.ignite.ml.math.impls.matrix; -import java.util.Collection; -import java.util.Map; -import java.util.UUID; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; import org.apache.ignite.Ignition; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.ml.math.Matrix; import org.apache.ignite.ml.math.StorageConstants; import org.apache.ignite.ml.math.Vector; import org.apache.ignite.ml.math.distributed.CacheUtils; import org.apache.ignite.ml.math.distributed.keys.RowColMatrixKey; import org.apache.ignite.ml.math.exceptions.CardinalityException; -import org.apache.ignite.ml.math.exceptions.UnsupportedOperationException; import org.apache.ignite.ml.math.functions.IgniteDoubleFunction; import org.apache.ignite.ml.math.impls.storage.matrix.SparseDistributedMatrixStorage; +import org.apache.ignite.ml.math.impls.storage.vector.SparseDistributedVectorStorage; +import org.apache.ignite.ml.math.impls.vector.SparseDistributedVector; + +import java.util.Collection; +import java.util.Map; +import java.util.UUID; /** * Sparse distributed matrix implementation based on data grid. @@ -68,6 +68,28 @@ public class SparseDistributedMatrix extends AbstractMatrix implements StorageCo assertStorageMode(stoMode); setStorage(new SparseDistributedMatrixStorage(rows, cols, stoMode, acsMode)); + + } + + /** + * @param data Data to fill the matrix + */ + public SparseDistributedMatrix(double[][] data) { + assert data.length > 0; + setStorage(new SparseDistributedMatrixStorage(data.length, getMaxAmountOfColumns(data), StorageConstants.ROW_STORAGE_MODE, StorageConstants.RANDOM_ACCESS_MODE)); + + for (int i = 0; i < data.length; i++) + for (int j = 0; j < data[i].length; j++) + storage().set(i,j,data[i][j]); + } + + + /** + * @param rows Amount of rows in the matrix. + * @param cols Amount of columns in the matrix. + */ + public SparseDistributedMatrix(int rows, int cols) { + this(rows, cols, StorageConstants.ROW_STORAGE_MODE, StorageConstants.RANDOM_ACCESS_MODE); } /** */ @@ -122,7 +144,6 @@ public class SparseDistributedMatrix extends AbstractMatrix implements StorageCo Ignite ignite = Ignition.localIgnite(); Affinity<RowColMatrixKey> affinity = ignite.affinity(cacheName); - IgniteCache<RowColMatrixKey, BlockEntry> cache = ignite.getOrCreateCache(cacheName); ClusterNode locNode = ignite.cluster().localNode(); SparseDistributedMatrixStorage storageC = matrixC.storage(); @@ -141,17 +162,17 @@ public class SparseDistributedMatrix extends AbstractMatrix implements StorageCo int idx = key.index(); if (isRowMode){ - Vector Aik = matrixA.getCol(idx); + Vector Aik = matrixA.getRow(idx); - for (int i = 0; i < columnSize(); i++) { - Vector Bkj = matrixB.getRow(i); + for (int i = 0; i < matrixB.columnSize(); i++) { + Vector Bkj = matrixB.getCol(i); matrixC.set(idx, i, Aik.times(Bkj).sum()); } } else { - Vector Bkj = matrixB.getRow(idx); + Vector Bkj = matrixB.getCol(idx); - for (int i = 0; i < rowSize(); i++) { - Vector Aik = matrixA.getCol(i); + for (int i = 0; i < matrixA.rowSize(); i++) { + Vector Aik = matrixA.getRow(i); matrixC.set(idx, i, Aik.times(Bkj).sum()); } } @@ -161,6 +182,49 @@ public class SparseDistributedMatrix extends AbstractMatrix implements StorageCo return matrixC; } + + /** {@inheritDoc} */ + @Override public Vector times(Vector vec) { + if (vec == null) + throw new IllegalArgumentException("The vector should be not null."); + + if (columnSize() != vec.size()) + throw new CardinalityException(columnSize(), vec.size()); + + SparseDistributedMatrix matrixA = this; + SparseDistributedVector vectorB = (SparseDistributedVector) vec; + + String cacheName = storage().cacheName(); + int rows = this.rowSize(); + + SparseDistributedVector vectorC = (SparseDistributedVector) likeVector(rows); + + CacheUtils.bcast(cacheName, () -> { + Ignite ignite = Ignition.localIgnite(); + Affinity<RowColMatrixKey> affinity = ignite.affinity(cacheName); + + ClusterNode locNode = ignite.cluster().localNode(); + + SparseDistributedVectorStorage storageC = vectorC.storage(); + + Map<ClusterNode, Collection<RowColMatrixKey>> keysCToNodes = affinity.mapKeysToNodes(storageC.getAllKeys()); + Collection<RowColMatrixKey> locKeys = keysCToNodes.get(locNode); + + if (locKeys == null) + return; + + // compute Cij locally on each node + // TODO: IGNITE:5114, exec in parallel + locKeys.forEach(key -> { + int idx = key.index(); + Vector Aik = matrixA.getRow(idx); + vectorC.set(idx, Aik.times(vectorB).sum()); + }); + }); + + return vectorC; + } + /** {@inheritDoc} */ @Override public Matrix assign(double val) { return mapOverValues(v -> val); @@ -198,17 +262,23 @@ public class SparseDistributedMatrix extends AbstractMatrix implements StorageCo /** {@inheritDoc} */ @Override public Matrix copy() { - throw new UnsupportedOperationException(); + Matrix cp = like(rowSize(), columnSize()); + + cp.assign(this); + + return cp; } /** {@inheritDoc} */ @Override public Matrix like(int rows, int cols) { - return new SparseDistributedMatrix(rows, cols, storage().storageMode(), storage().accessMode()); + if(storage()==null) return new SparseDistributedMatrix(rows, cols); + else return new SparseDistributedMatrix(rows, cols, storage().storageMode(), storage().accessMode()); + } /** {@inheritDoc} */ @Override public Vector likeVector(int crd) { - throw new UnsupportedOperationException(); + return new SparseDistributedVector(crd, StorageConstants.RANDOM_ACCESS_MODE); } /** */