This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new fb5126a  [SYSTEMDS-3289] Multithreaded equi-height binning
fb5126a is described below

commit fb5126a21ac249f04309f50c8ecab218c7104781
Author: arnabp <[email protected]>
AuthorDate: Thu Feb 3 20:44:08 2022 +0100

    [SYSTEMDS-3289] Multithreaded equi-height binning
    
    This patch adds multithreaded support to equi-height binning
    in transformencode/apply. We use partition-sorting and a
    heap-based merging of sorted blocks.
    
    Closes #1495.
---
 ...ltiReturnParameterizedBuiltinSPInstruction.java |  24 ++--
 .../runtime/transform/encode/ColumnEncoderBin.java | 137 ++++++++++++++++-----
 .../TransformFrameBuildMultithreadedTest.java      |  28 +++--
 3 files changed, 144 insertions(+), 45 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
index 9c8c08b..bca0825 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
@@ -315,16 +315,22 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
                        }
                        // handle bin boundaries
                        else if(_encoder.containsEncoderForID(colID, 
ColumnEncoderBin.class)) {
-                               double min = Double.MAX_VALUE;
-                               double max = -Double.MAX_VALUE;
-                               while(iter.hasNext()) {
-                                       double value = 
Double.parseDouble(iter.next().toString());
-                                       min = Math.min(min, value);
-                                       max = Math.max(max, value);
-                               }
                                ColumnEncoderBin baEncoder = 
_encoder.getColumnEncoder(colID, ColumnEncoderBin.class);
-                               assert baEncoder != null;
-                               baEncoder.computeBins(min, max);
+                               if (baEncoder.getBinMethod() == 
ColumnEncoderBin.BinMethod.EQUI_WIDTH) {
+                                       double min = Double.MAX_VALUE;
+                                       double max = -Double.MAX_VALUE;
+                                       while(iter.hasNext()) {
+                                               double value = 
Double.parseDouble(iter.next().toString());
+                                               min = Math.min(min, value);
+                                               max = Math.max(max, value);
+                                       }
+                                       assert baEncoder != null;
+                                       baEncoder.computeBins(min, max);
+                               }
+                               else //TODO: support equi-height
+                                       throw new DMLRuntimeException("Binning 
method "+baEncoder.getBinMethod().toString()
+                                               +" is not support for Spark");
+
                                double[] binMins = baEncoder.getBinMins();
                                double[] binMaxs = baEncoder.getBinMaxs();
                                for(int i = 0; i < binMins.length; i++) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
index fdc1ad6..cb6e0af 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
@@ -26,6 +26,7 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.PriorityQueue;
 import java.util.concurrent.Callable;
 
 import org.apache.commons.lang3.tuple.MutableTriple;
@@ -43,7 +44,6 @@ public class ColumnEncoderBin extends ColumnEncoder {
        private static final long serialVersionUID = 1917445005206076078L;
        protected int _numBin = -1;
        private BinMethod _binMethod = BinMethod.EQUI_WIDTH;
-       private double[] _sortedInput = null;
 
        // frame transform-apply attributes
        // a) column bin boundaries
@@ -86,6 +86,17 @@ public class ColumnEncoderBin extends ColumnEncoder {
                return _binMaxs;
        }
 
+       public BinMethod getBinMethod() {
+               return _binMethod;
+       }
+
+       public void setBinMethod(String method) {
+               if (method.equalsIgnoreCase(BinMethod.EQUI_WIDTH.toString()))
+                       _binMethod = BinMethod.EQUI_WIDTH;
+               if (method.equalsIgnoreCase(BinMethod.EQUI_HEIGHT.toString()))
+                       _binMethod = BinMethod.EQUI_HEIGHT;
+       }
+
        @Override
        public void build(CacheBlock in) {
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
@@ -96,8 +107,8 @@ public class ColumnEncoderBin extends ColumnEncoder {
                        computeBins(pairMinMax[0], pairMinMax[1]);
                }
                else if(_binMethod == BinMethod.EQUI_HEIGHT) {
-                       prepareDataForEqualHeightBins(in, _colID, 0, -1);
-                       computeEqualHeightBins();
+                       double[] sortedCol = prepareDataForEqualHeightBins(in, 
_colID, 0, -1);
+                       computeEqualHeightBins(sortedCol);
                }
 
                if(DMLScript.STATISTICS)
@@ -177,18 +188,19 @@ public class ColumnEncoderBin extends ColumnEncoder {
                return new double[] {min, max};
        }
 
-       private void prepareDataForEqualHeightBins(CacheBlock in, int colID, 
int startRow, int blockSize) {
-               int numRows = getEndIndex(in.getNumRows(), startRow, blockSize) 
- startRow;
-               _sortedInput = new double[numRows];
-               for(int i = startRow; i < numRows; i++) {
+       private static double[] prepareDataForEqualHeightBins(CacheBlock in, 
int colID, int startRow, int blockSize) {
+               int endRow = getEndIndex(in.getNumRows(), startRow, blockSize);
+               double[] vals = new double[endRow-startRow];
+               for(int i = startRow; i < endRow; i++) {
                        double inVal = in.getDouble(i, colID - 1);
                        //FIXME current NaN handling introduces 0s and thus
                        // impacts the computation of bin boundaries
                        if(Double.isNaN(inVal))
                                continue;
-                       _sortedInput[i] = inVal;
+                       vals[i-startRow] = inVal;
                }
-               Arrays.sort(_sortedInput);
+               Arrays.sort(vals);
+               return vals;
        }
 
        @Override
@@ -197,9 +209,9 @@ public class ColumnEncoderBin extends ColumnEncoder {
        }
 
        @Override
-       public Callable<Object> getPartialBuildTask(CacheBlock in, int 
startRow, int blockSize, 
+       public Callable<Object> getPartialBuildTask(CacheBlock in, int 
startRow, int blockSize,
                        HashMap<Integer, Object> ret) {
-               return new BinPartialBuildTask(in, _colID, startRow, blockSize, 
ret);
+               return new BinPartialBuildTask(in, _colID, startRow, blockSize, 
_binMethod, ret);
        }
 
        @Override
@@ -219,20 +231,20 @@ public class ColumnEncoderBin extends ColumnEncoder {
                }
        }
 
-       private void computeEqualHeightBins() {
+       private void computeEqualHeightBins(double[] sortedCol) {
                if(_binMins == null || _binMaxs == null) {
                        _binMins = new double[_numBin];
                        _binMaxs = new double[_numBin];
                }
-               int n = _sortedInput.length;
+               int n = sortedCol.length;
                for(int i = 0; i < _numBin; i++) {
                        double pos = n * (i + 1d) / _numBin;
                        _binMaxs[i] = (pos % 1 == 0) ? // pos is integer
-                               _sortedInput[(int) pos-1] :
-                               _sortedInput[(int) Math.floor(pos)];
+                               sortedCol[(int) pos-1] :
+                               sortedCol[(int) Math.floor(pos)];
                }
-               _binMaxs[_numBin-1] = _sortedInput[n-1];
-               _binMins[0] = _sortedInput[0];
+               _binMaxs[_numBin-1] = sortedCol[n-1];
+               _binMins[0] = sortedCol[0];
                System.arraycopy(_binMaxs, 0, _binMins, 1, _numBin - 1);
        }
 
@@ -324,6 +336,7 @@ public class ColumnEncoderBin extends ColumnEncoder {
                super.writeExternal(out);
 
                out.writeInt(_numBin);
+               out.writeUTF(_binMethod.toString());
                out.writeBoolean(_binMaxs != null);
                if(_binMaxs != null) {
                        for(int j = 0; j < _binMaxs.length; j++) {
@@ -337,6 +350,7 @@ public class ColumnEncoderBin extends ColumnEncoder {
        public void readExternal(ObjectInput in) throws IOException {
                super.readExternal(in);
                _numBin = in.readInt();
+               setBinMethod(in.readUTF());
                boolean minmax = in.readBoolean();
                _binMaxs = minmax ? new double[_numBin] : null;
                _binMins = minmax ? new double[_numBin] : null;
@@ -385,24 +399,34 @@ public class ColumnEncoderBin extends ColumnEncoder {
                private final int _blockSize;
                private final int _startRow;
                private final int _colID;
-               private final HashMap<Integer, Object> _partialMinMax;
+               private final BinMethod _method;
+               private final HashMap<Integer, Object> _partialData;
 
                // if a pool is passed the task may be split up into multiple 
smaller tasks.
                protected BinPartialBuildTask(CacheBlock input, int colID, int 
startRow, 
-                               int blocksize, HashMap<Integer, Object> 
partialMinMax) {
+                               int blocksize, BinMethod method, 
HashMap<Integer, Object> partialData) {
                        _input = input;
                        _blockSize = blocksize;
                        _colID = colID;
                        _startRow = startRow;
-                       _partialMinMax = partialMinMax;
+                       _method = method;
+                       _partialData = partialData;
                }
 
                @Override
                public double[] call() throws Exception {
                        long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-                       double[] minMax = getMinMaxOfCol(_input, _colID, 
_startRow, _blockSize);
-                       synchronized (_partialMinMax){
-                               _partialMinMax.put(_startRow, minMax);
+                       if (_method == BinMethod.EQUI_WIDTH) {
+                               double[] minMax = getMinMaxOfCol(_input, 
_colID, _startRow, _blockSize);
+                               synchronized(_partialData) {
+                                       _partialData.put(_startRow, minMax);
+                               }
+                       }
+                       if (_method == BinMethod.EQUI_HEIGHT) {
+                               double[] sortedVals = 
prepareDataForEqualHeightBins(_input, _colID, _startRow, _blockSize);
+                               synchronized(_partialData) {
+                                       _partialData.put(_startRow, sortedVals);
+                               }
                        }
                        if (DMLScript.STATISTICS)
                                
TransformStatistics.incBinningBuildTime(System.nanoTime()-t0);
@@ -424,16 +448,56 @@ public class ColumnEncoderBin extends ColumnEncoder {
                        _encoder = encoderBin;
                }
 
+               private double[] mergeKSortedArrays(double[][] arrs) {
+                       //PriorityQueue is heap in Java
+                       PriorityQueue<ArrayContainer> queue;
+                       queue = new PriorityQueue<>();
+                       int total=0;
+
+                       //add arrays to heap
+                       for(double[] arr : arrs) {
+                               queue.add(new ArrayContainer(arr, 0));
+                               total = total + arr.length;
+                       }
+                       int m=0;
+                       double[] result = new double[total];
+
+                       //while heap is not empty
+                       while(!queue.isEmpty()){
+                               ArrayContainer ac = queue.poll();
+                               result[m++]=ac.arr[ac.index];
+                               if(ac.index < ac.arr.length-1){
+                                       queue.add(new ArrayContainer(ac.arr, 
ac.index+1));
+                               }
+                       }
+                       return result;
+               }
+
                @Override
                public Object call() throws Exception {
                        long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-                       double min = Double.POSITIVE_INFINITY;
-                       double max = Double.NEGATIVE_INFINITY;
-                       for(Object minMax : _partialMaps.values()) {
-                               min = Math.min(min, ((double[]) minMax)[0]);
-                               max = Math.max(max, ((double[]) minMax)[1]);
+                       if (_encoder.getBinMethod() == BinMethod.EQUI_WIDTH) {
+                               double min = Double.POSITIVE_INFINITY;
+                               double max = Double.NEGATIVE_INFINITY;
+                               for(Object minMax : _partialMaps.values()) {
+                                       min = Math.min(min, ((double[]) 
minMax)[0]);
+                                       max = Math.max(max, ((double[]) 
minMax)[1]);
+                               }
+                               _encoder.computeBins(min, max);
+                       }
+
+                       if (_encoder.getBinMethod() == BinMethod.EQUI_HEIGHT) {
+                               double[][] allParts = new 
double[_partialMaps.size()][];
+                               int i = 0;
+                               for (Object arr: _partialMaps.values())
+                                       allParts[i++] = (double[]) arr;
+
+                               // Heap-based merging of sorted partitions.
+                               // TODO: Derive bin boundaries from partial 
aggregates, avoiding
+                               // materializing the sorted arrays (e.g. 
federated quantile)
+                               double[] sortedRes = 
mergeKSortedArrays(allParts);
+                               _encoder.computeEqualHeightBins(sortedRes);
                        }
-                       _encoder.computeBins(min, max);
 
                        if(DMLScript.STATISTICS)
                                
TransformStatistics.incBinningBuildTime(System.nanoTime()-t0);
@@ -446,6 +510,21 @@ public class ColumnEncoderBin extends ColumnEncoder {
                }
        }
 
+       private static class ArrayContainer implements 
Comparable<ArrayContainer> {
+               double[] arr;
+               int index;
+
+               public ArrayContainer(double[] arr, int index) {
+                       this.arr = arr;
+                       this.index = index;
+               }
+
+               @Override
+               public int compareTo(ArrayContainer o) {
+                       return this.arr[this.index] < o.arr[o.index] ? -1 : 1;
+               }
+       }
+
        private static class ColumnBinBuildTask implements Callable<Object> {
                private final ColumnEncoderBin _encoder;
                private final CacheBlock _input;
diff --git 
a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameBuildMultithreadedTest.java
 
b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameBuildMultithreadedTest.java
index 0c235d6..27d7c78 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameBuildMultithreadedTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameBuildMultithreadedTest.java
@@ -52,7 +52,8 @@ public class TransformFrameBuildMultithreadedTest extends 
AutomatedTestBase {
        //private final static String SPEC1b = 
"homes3/homes.tfspec_recode2.json";
        private final static String SPEC2 = "homes3/homes.tfspec_dummy.json";
        //private final static String SPEC2b = 
"homes3/homes.tfspec_dummy2.json";
-       private final static String SPEC3 = "homes3/homes.tfspec_bin.json"; // 
recode
+       private final static String SPEC3a = "homes3/homes.tfspec_bin.json"; // 
recode
+       private final static String SPEC3b = 
"homes3/homes.tfspec_bin_height.json"; // recode
        //private final static String SPEC3b = "homes3/homes.tfspec_bin2.json"; 
// recode
        private final static String SPEC6 = 
"homes3/homes.tfspec_recode_dummy.json";
        //private final static String SPEC6b = 
"homes3/homes.tfspec_recode_dummy2.json";
@@ -65,7 +66,7 @@ public class TransformFrameBuildMultithreadedTest extends 
AutomatedTestBase {
        private final static String SPEC10 = 
"homes3/homes.tfspec_recode_bin.json";
 
        public enum TransformType {
-               RECODE, DUMMY, RECODE_DUMMY, BIN, BIN_DUMMY, HASH, HASH_RECODE, 
RECODE_BIN,
+               RECODE, DUMMY, RECODE_DUMMY, BIN_WIDTH, BIN_HEIGHT, BIN_DUMMY, 
HASH, HASH_RECODE, RECODE_BIN,
        }
 
        @Override
@@ -101,12 +102,21 @@ public class TransformFrameBuildMultithreadedTest extends 
AutomatedTestBase {
 
        @Test
        public void testHomesBuildBinSingleNodeCSV() {
-               runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", 
TransformType.BIN, 0);
+               runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", 
TransformType.BIN_WIDTH, 0);
        }
 
        @Test
        public void testHomesBuild50BinSingleNodeCSV() {
-               runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", 
TransformType.BIN, 50);
+               runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", 
TransformType.BIN_WIDTH, 50);
+       }
+       @Test
+       public void testHomesBuildBinEQHTCSV() {
+               runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", 
TransformType.BIN_HEIGHT, 0);
+       }
+
+       @Test
+       public void testHomesBuild50BinEQHTCSV() {
+               runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", 
TransformType.BIN_HEIGHT, 50);
        }
 
        @Test
@@ -132,8 +142,12 @@ public class TransformFrameBuildMultithreadedTest extends 
AutomatedTestBase {
                                SPEC = SPEC2;
                                DATASET = DATASET1;
                                break;
-                       case BIN:
-                               SPEC = SPEC3;
+                       case BIN_WIDTH:
+                               SPEC = SPEC3a;
+                               DATASET = DATASET1;
+                               break;
+                       case BIN_HEIGHT:
+                               SPEC = SPEC3b;
                                DATASET = DATASET1;
                                break;
                        case RECODE_DUMMY:
@@ -191,7 +205,7 @@ public class TransformFrameBuildMultithreadedTest extends 
AutomatedTestBase {
                                        
assertEquals(encodersS.get(i).getRcdMap().keySet(), 
encodersM.get(i).getRcdMap().keySet());
                                }
                        }
-                       else if(type == TransformType.BIN) {
+                       else if(type == TransformType.BIN_WIDTH || type == 
TransformType.BIN_HEIGHT) {
                                List<ColumnEncoderBin> encodersS = 
encoderS.getColumnEncoders(ColumnEncoderBin.class);
                                List<ColumnEncoderBin> encodersM = 
encoderM.getColumnEncoders(ColumnEncoderBin.class);
                                assertEquals(encodersS.size(), 
encodersM.size());

Reply via email to