This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 15df58fe0b [SYSTEMDS-3467] Support of MULTI_BLOCK Spark for
countDistinct()
15df58fe0b is described below
commit 15df58fe0b4a9ab8290a34b57e08850c0e5b5f05
Author: Badrul Chowdhury <[email protected]>
AuthorDate: Fri Nov 18 17:17:06 2022 -0800
[SYSTEMDS-3467] Support of MULTI_BLOCK Spark for countDistinct()
This patch adds support for running MULTI_BLOCK aggregations
for countDistinct() builtin on the Spark backend.
The implementation augments the CountDistinctFunctionSketch
with the union() function implementation.
Closes #1734
---
.../sketch/countdistinct/BitMapValueCombiner.java | 41 +++++++++++++++
.../countdistinct/CountDistinctFunctionSketch.java | 58 ++++++++++++++++++++--
.../countDistinct/CountDistinctRowColBase.java | 7 +++
3 files changed, 102 insertions(+), 4 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/matrix/data/sketch/countdistinct/BitMapValueCombiner.java
b/src/main/java/org/apache/sysds/runtime/matrix/data/sketch/countdistinct/BitMapValueCombiner.java
new file mode 100644
index 0000000000..5799f86ba8
--- /dev/null
+++
b/src/main/java/org/apache/sysds/runtime/matrix/data/sketch/countdistinct/BitMapValueCombiner.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.matrix.data.sketch.countdistinct;
+
+import java.util.Set;
+import java.util.function.BinaryOperator;
+
+public class BitMapValueCombiner implements BinaryOperator<Set<Long>> {
+
+ @Override
+ public Set<Long> apply(Set<Long> set0, Set<Long> set1) {
+ if (set0.isEmpty()) {
+ return set1;
+ }
+
+ if (set1.isEmpty()) {
+ return set0;
+ }
+
+ // Merging left-right is identical to merging right-left
+ set0.addAll(set1);
+ return set0;
+ }
+}
diff --git
a/src/main/java/org/apache/sysds/runtime/matrix/data/sketch/countdistinct/CountDistinctFunctionSketch.java
b/src/main/java/org/apache/sysds/runtime/matrix/data/sketch/countdistinct/CountDistinctFunctionSketch.java
index efdcfa69a6..79c6daf39a 100644
---
a/src/main/java/org/apache/sysds/runtime/matrix/data/sketch/countdistinct/CountDistinctFunctionSketch.java
+++
b/src/main/java/org/apache/sysds/runtime/matrix/data/sketch/countdistinct/CountDistinctFunctionSketch.java
@@ -19,7 +19,6 @@
package org.apache.sysds.runtime.matrix.data.sketch.countdistinct;
-import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.hops.OptimizerUtils;
@@ -31,7 +30,10 @@ import org.apache.sysds.runtime.matrix.operators.Operator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.OptionalInt;
import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class CountDistinctFunctionSketch extends CountDistinctSketch {
@@ -110,7 +112,7 @@ public class CountDistinctFunctionSketch extends
CountDistinctSketch {
}
}
- MatrixBlock blkOutCorr = serializeInputMatrixBlock(bitMap,
maxColumns);
+ MatrixBlock blkOutCorr = serialize(bitMap, maxColumns);
// The sketch contains all relevant info, so the input matrix
can be discarded at this point
return new CorrMatrixBlock(blkIn, blkOutCorr);
@@ -121,7 +123,7 @@ public class CountDistinctFunctionSketch extends
CountDistinctSketch {
return kMask & (n >> startingIndex);
}
- private MatrixBlock serializeInputMatrixBlock(Map<Short, Set<Long>>
bitMap, int maxWidth) {
+ private MatrixBlock serialize(Map<Short, Set<Long>> bitMap, int
maxWidth) {
// Each row in output matrix corresponds to a key and each
column to a fraction value for that key.
// The first column will store the exponent value itself:
@@ -150,9 +152,57 @@ public class CountDistinctFunctionSketch extends
CountDistinctSketch {
return blkOut;
}
+ private Map<Short, Set<Long>> deserialize(MatrixBlock blkIn) {
+ int R = blkIn.getNumRows();
+ Map<Short, Set<Long>> bitMap = new HashMap<>();
+
+ // row_i: [exponent_i, N_i, fraction_i0, fraction_i1, ..,
fraction_iN, 0, .., 0]
+ for (int i=0; i<R; ++i) {
+ short key = (short) blkIn.getValue(i, 0);
+ Set<Long> fractions = bitMap.getOrDefault(key, new
HashSet<>());
+
+ int C = (int) blkIn.getValue(i, 1);
+ int j = 0;
+ while (j < C) {
+ long fraction = (long) blkIn.getValue(i, j + 2);
+ fractions.add(fraction);
+ ++j;
+ }
+
+ bitMap.put(key, fractions);
+ }
+
+ return bitMap;
+ }
+
@Override
public CorrMatrixBlock union(CorrMatrixBlock arg0, CorrMatrixBlock
arg1) {
- throw new NotImplementedException("MULTI_BLOCK aggregation is
not supported yet");
+ MatrixBlock corr0 = arg0.getCorrection();
+ Map<Short, Set<Long>> bitMap0 = deserialize(corr0);
+
+ MatrixBlock corr1 = arg1.getCorrection();
+ Map<Short, Set<Long>> bitMap1 = deserialize(corr1);
+
+ // Map putAll() is not suitable here as it will replace Map
values for identical keys.
+ // We will use a custom combiner with stream() and collect()
instead.
+ Map<Short, Set<Long>> bitMapOut =
+ Stream.concat(bitMap0.entrySet().stream(),
bitMap1.entrySet().stream())
+ .collect(Collectors.toMap(
+
Map.Entry::getKey,
+
Map.Entry::getValue,
+ new
BitMapValueCombiner()
+ ));
+
+ // Find the maximum column width
+ OptionalInt maxWidthOpt =
bitMapOut.values().stream().mapToInt(Set::size).max();
+ if (maxWidthOpt.isEmpty()) {
+ throw new IllegalArgumentException("Corrupt sketch:
metadata is invalid");
+ }
+
+ int maxWidth = maxWidthOpt.getAsInt();
+ MatrixBlock blkOutCorr = serialize(bitMapOut, maxWidth);
+
+ return new CorrMatrixBlock(arg0.getValue(), blkOutCorr);
}
@Override
diff --git
a/src/test/java/org/apache/sysds/test/functions/countDistinct/CountDistinctRowColBase.java
b/src/test/java/org/apache/sysds/test/functions/countDistinct/CountDistinctRowColBase.java
index 5a7a61c6ce..c32ec12a54 100644
---
a/src/test/java/org/apache/sysds/test/functions/countDistinct/CountDistinctRowColBase.java
+++
b/src/test/java/org/apache/sysds/test/functions/countDistinct/CountDistinctRowColBase.java
@@ -58,6 +58,13 @@ public abstract class CountDistinctRowColBase extends
CountDistinctBase {
countDistinctScalarTest(1723, 5000, 2000, 1.0, ex, tolerance);
}
+ @Test
+ public void testSparkDenseXLarge() {
+ ExecType ex = ExecType.SPARK;
+ double tolerance = baseTolerance + 1723 * percentTolerance;
+ countDistinctScalarTest(1723, 5000, 2000, 1.0, ex, tolerance);
+ }
+
@Test
public void testCPDense1Unique() {
ExecType ex = ExecType.CP;