This is an automated email from the ASF dual-hosted git repository. mboehm7 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push: new fb2447274d [SYSTEMDS-3708] Add permutation-matrix method to raGroupby fb2447274d is described below commit fb2447274dc6101f8e1eeec0a7697662b2a3f74a Author: gghsu <ppp432...@gmail.com> AuthorDate: Sat Jul 27 18:54:13 2024 +0200 [SYSTEMDS-3708] Add permutation-matrix method to raGroupby Closes #2052. --- scripts/builtin/raGroupby.dml | 135 +++++++++++++++------ scripts/builtin/raJoin.dml | 32 +++-- .../builtin/part2/BuiltinRaGroupbyTest.java | 76 +++++++++--- src/test/scripts/functions/builtin/raGroupby.dml | 5 +- 4 files changed, 167 insertions(+), 81 deletions(-) diff --git a/scripts/builtin/raGroupby.dml b/scripts/builtin/raGroupby.dml index 1499bc0fcf..7d7035c0ff 100644 --- a/scripts/builtin/raGroupby.dml +++ b/scripts/builtin/raGroupby.dml @@ -26,7 +26,7 @@ # ------------------------------------------------------------------------------ # X Matrix of input data [shape: N x M] # col Integer indicating the column index to execute grupby command -# method Groupby implemention method (nested-loop) +# method Groupby implemention method (nested-loop, permutation-matrix) # ------------------------------------------------------------------------------ # # OUTPUT: @@ -34,50 +34,105 @@ # Y Matrix of selected data [shape N' x M] with N' <= N # ------------------------------------------------------------------------------ -m_raGroupby = function (Matrix[Double] X, Integer col, String method="nested-loop") +m_raGroupby = function (Matrix[Double] X, Integer col, String method) return (Matrix[Double] Y) { - # Extract and sort unique values from the specified column (1-based index) - uniqueValues = unique(X[, col]) - order_uniqueValues = order(target = uniqueValues, by = 1); - - # Calcute the number of groups - numGroups = nrow(uniqueValues) - - # Determine the maximum number of rows in any group - maxRowsInGroup = max(table(X[,col],1)); - - # Define a zero matrix to put the group data into - Y=matrix(0,numGroups,maxRowsInGroup*(ncol(X)-1)+1) - - # Put the ordered uniqueValues into first column of Y as group_id - Y[,1] = order_uniqueValues - - # Loop for each group - for(i in 1:numGroups){ - index = 0 - - # Iterate each row in matrix X to deal with group data - for ( j in 1:nrow(X) ) { - if ( as.scalar( X[j,col] == order_uniqueValues[i,1] )) { - # Define the formula of the start and end column position - startCol = index*(ncol(X)-1) +2 - endCol = startCol + (ncol(X)-2) - - if (col == 1) { - # Case when the selected column is the first column - Y[i,startCol:endCol]=X[j,2:ncol(X)] - } else if (col == ncol(X)) { - # Case when the selected column is the last column - Y[i,startCol:endCol]=X[j,1:(ncol(X)-1)] - } else { - # General case - newRow = cbind(X[j, 1:(col-1)], X[j, (col+1):ncol(X)]) - Y[i,startCol:endCol]=newRow + if (method == "nested-loop") { + # Extract and sort unique values from the specified column (1-based index) + uniqueValues = unique(X[, col]) + order_uniqueValues = order(target = uniqueValues, by = 1); + + # Calcute the number of groups + numGroups = nrow(uniqueValues) + + # Determine the maximum number of rows in any group + maxRowsInGroup = max(table(X[,col],1)); + + # Define a zero matrix to put the group data into + Y = matrix(0,numGroups,maxRowsInGroup*(ncol(X)-1)+1) + + # Put the ordered uniqueValues into first column of Y as group_id + #Y[,1] = order_uniqueValues + Y[,1] = uniqueValues + + # Loop for each group + for(i in 1:numGroups){ + index = 0 + + # Iterate each row in matrix X to deal with group data + for ( j in 1:nrow(X) ) { + if ( as.scalar( X[j,col] == uniqueValues[i,1] )) { + # Define the formula of the start and end column position + startCol = index*(ncol(X)-1) +2 + endCol = startCol + (ncol(X)-2) + + if (col == 1) { + # Case when the selected column is the first column + Y[i,startCol:endCol] = X[j,2:ncol(X)] + } + else if (col == ncol(X)) { + # Case when the selected column is the last column + Y[i,startCol:endCol] = X[j,1:(ncol(X)-1)] + } + else { + # General case + newRow = cbind(X[j, 1:(col-1)], X[j, (col+1):ncol(X)]) + Y[i,startCol:endCol] = newRow + } + index = index +1 } - index = index +1 } } } + else if (method == "permutation-matrix") { + # Extract the grouping column and create unique groups + key = X[,col] + key_unique = unique(X[, col]) + numGroups = nrow(key_unique) + + # Matrix for comparison + key_compare = key_unique %*% matrix(1, rows=1, cols=nrow(X)) + key_matrix = matrix(1, rows=nrow(key_unique), cols=1) %*% t(key) + + # Find group index + groupIndex = rowIndexMax(t(key_compare == key_matrix)) + + # Determine the maximum number of rows in any group + maxRowsInGroup = max(table(X[,col],1)) + totalCells = (maxRowsInGroup) * (ncol(X)-1) +1 + + # Create permutation matrix P copy relevant tuples with a single matrix multiplication + P = matrix(0, rows=nrow(X), cols=numGroups * maxRowsInGroup) + # Create offsets to store the first column of each group + offsets = matrix(seq(0, (numGroups-1)*maxRowsInGroup, maxRowsInGroup), rows=numGroups, cols=1) + + # Create row and column index for the permutation matrix + rowIndex = seq(1, nrow(X)) + indexWithInGroups = cumsum(t(table(groupIndex, seq(1, nrow(X)), numGroups, nrow(X)))) + selectedMatrix = table(seq(1, nrow(indexWithInGroups)), groupIndex) + colIndex = groupIndex * maxRowsInGroup - maxRowsInGroup + rowSums(indexWithInGroups * selectedMatrix) + + # Set values in P + P = table(seq(1, nrow(X)), colIndex) + + # Perform matrix multiplication + Y_temp = t(P) %*% X + + # Remove the selected column from Y_temp + if( col == 1 ) { + Y_temp_reduce = Y_temp[, col+1:ncol(Y_temp)] + } + else if( col == ncol(X) ) { + Y_temp_reduce = Y_temp[, 1:col-1] + } + else{ + Y_temp_reduce = cbind(Y_temp[, 1:col-1],Y_temp[, col+1:ncol(Y_temp)]) + } + + # Set value of final output + Y = matrix(0, rows=numGroups, cols=totalCells) + Y[,1] = key_unique + Y[,2:ncol(Y)] = matrix(Y_temp_reduce, rows=numGroups, cols=totalCells-1) + } } diff --git a/scripts/builtin/raJoin.dml b/scripts/builtin/raJoin.dml index bddf94c019..2750b598f3 100644 --- a/scripts/builtin/raJoin.dml +++ b/scripts/builtin/raJoin.dml @@ -28,7 +28,7 @@ # colA Integer indicating the column index of matrix A to execute inner join command # B Matrix of right left data [shape: N x M] # colA Integer indicating the column index of matrix B to execute inner join command -# method Join implementation method (nested-loop, sort-merge) +# method Join implementation method (nested-loop, sort-merge, hash) # ------------------------------------------------------------------------------ # # OUTPUT: @@ -37,7 +37,7 @@ # ------------------------------------------------------------------------------ m_raJoin = function (Matrix[Double] A, Integer colA, Matrix[Double] B, - Integer colB, String method="sort-merge") + Integer colB, String method) return (Matrix[Double] Y) { # Sort the input Matrix with specific column in order to ensure same output order @@ -65,8 +65,8 @@ m_raJoin = function (Matrix[Double] A, Integer colA, Matrix[Double] B, right = B[, colB] # Sort join keys - leftIdx = order(target = left, decreasing=FALSE) - rightIdx = order(target = right, decreasing=FALSE) + leftIdx = seq(1, nrow(A)) + rightIdx = seq(1, nrow(B)) # Ensure histograms are aligned by creating a common set of keys commonKeys = max(max(left), max(right)); @@ -94,18 +94,15 @@ m_raJoin = function (Matrix[Double] A, Integer colA, Matrix[Double] B, # Determine the number of rows in outBucket num_rows = nrow(outBucket) - # Initialize a matrix to store the result - updatedoffset = matrix(0, rows=num_rows, cols=1) - leftOutIdx = matrix(0, rows=num_rows, cols=1) - rightOutIdx = matrix(0, rows=num_rows, cols=1) - # Compute the element-wise subtraction and store in result # TODO performance - try avoid iterating over rows - for(i in 1:num_rows) { - updatedoffset[i, 1] = offset[i, 1] - (cumHistMul[as.scalar(outBucket[i, 1]), 1] - histMul[as.scalar(outBucket[i, 1]), 1]) -1 - leftOutIdx[i, 1] = as.scalar(cumLeftHist[as.scalar(outBucket[i, 1]), 1] - leftHist[as.scalar(outBucket[i, 1]), 1] + floor(updatedoffset[i, 1] / rightHist[as.scalar(outBucket[i, 1]), 1])) +1 - rightOutIdx[i, 1] = as.scalar(cumRightHist[as.scalar(outBucket[i, 1]), 1] - rightHist[as.scalar(outBucket[i, 1]), 1] + (updatedoffset[i, 1] %% rightHist[as.scalar(outBucket[i, 1]), 1])) +1 - } + # create a mask to apply the outBucket value as an index of following matrix + seqMatrix = matrix(1, rows=nrow(outBucket), cols=1) %*% t(seq(1, nrow(cumHistMul))) + mask = (outBucket == seqMatrix) + + updatedoffset = offset - (mask %*% (cumHistMul - histMul)) - 1 + leftOutIdx = mask %*% (cumLeftHist - leftHist) + (floor(updatedoffset / mask %*% rightHist)) + 1 + rightOutIdx = mask %*% (cumRightHist - rightHist) + (updatedoffset %% (mask %*% rightHist)) + 1 nrows = length(offset) ncolsA = ncol(A) @@ -118,8 +115,6 @@ m_raJoin = function (Matrix[Double] A, Integer colA, Matrix[Double] B, Y[i, (ncolsA + 1):(ncolsA + ncolsB)] = B[as.scalar(rightOutIdx[i, 1]), ] } } - # TODO hash-based method which constructs permutation tables to replicate - # tuples of lhs and rhs and simply concatenates these tuples via cbind else{ Y = matrix(0, rows=0, cols=1) } @@ -152,7 +147,7 @@ m_raJoin = function (Matrix[Double] A, Integer colA, Matrix[Double] B, # Function to perform parallel binary search parallelBinarySearch = function (Matrix[double] offset, Matrix[double] cumHistMul) - return (Matrix[double] matched_result) +return (Matrix[double] matched_result) { n = nrow(cumHistMul) result = matrix(0, rows=nrow(offset), cols=1) @@ -164,7 +159,8 @@ parallelBinarySearch = function (Matrix[double] offset, Matrix[double] cumHistMu if ( as.scalar(offset[i] <= cumHistMul[mid]) ) { result[i] = mid high = mid - 1 - } else { + } + else { low = mid + 1 } } diff --git a/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaGroupbyTest.java b/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaGroupbyTest.java index 6db3c46d3b..770f8c29e4 100644 --- a/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaGroupbyTest.java +++ b/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaGroupbyTest.java @@ -42,7 +42,46 @@ public class BuiltinRaGroupbyTest extends AutomatedTestBase } @Test - public void testRaGroupbyTest() { + public void testRaGroupbyTest1() { + testRaGroupbyTest("nested-loop"); + } + + @Test + public void testRaGroupbyTest2() { + testRaGroupbyTest("permutation-matrix"); + } + + @Test + public void testRaGroupbyTestwithDifferentColumn1() { + testRaGroupbyTestwithDifferentColumn("nested-loop"); + } + + @Test + public void testRaGroupbyTestwithDifferentColumn2() { + testRaGroupbyTestwithDifferentColumn("permutation-matrix"); + } + + @Test + public void testRaGroupbyTestwithNoGroup1() { + testRaGroupbyTestwithNoGroup("nested-loop"); + } + + @Test + public void testRaGroupbyTestwithNoGroup2() { + testRaGroupbyTestwithNoGroup("permutation-matrix"); + } + + @Test + public void testRaGroupbyTestwithOneGroup1() { + testRaGroupbyTestwithOneGroup("nested-loop"); + } + + @Test + public void testRaGroupbyTestwithOneGroup2() { + testRaGroupbyTestwithOneGroup("permutation-matrix"); + } + + public void testRaGroupbyTest(String method) { //generate actual dataset and variables double[][] X = { {1, 2, 3}, @@ -58,11 +97,10 @@ public class BuiltinRaGroupbyTest extends AutomatedTestBase {4, 7, 8, 7, 8, 8, 9} }; - runRaGroupbyTest(X, select_col, Y); + runRaGroupbyTest(X, select_col, Y, method); } - @Test - public void testRaGroupbyTestwithDifferentColumn() { + public void testRaGroupbyTestwithDifferentColumn(String method) { //generate actual dataset and variables double[][] X = { {1, 2, 3}, @@ -75,16 +113,15 @@ public class BuiltinRaGroupbyTest extends AutomatedTestBase // Expected output matrix double[][] Y = { {2, 1, 3, 0, 0}, + {8, 4, 9, 0, 0}, {3, 1, 6, 0, 0}, - {7, 4, 8, 4, 8}, - {8, 4, 9, 0, 0} + {7, 4, 8, 4, 8} }; - runRaGroupbyTest(X, select_col, Y); + runRaGroupbyTest(X, select_col, Y, method); } - @Test - public void testRaGroupbyTestwithNoGroup() { + public void testRaGroupbyTestwithNoGroup(String method) { // Test case with different values in select_col double[][] X = { {1, 1, 1}, @@ -98,16 +135,15 @@ public class BuiltinRaGroupbyTest extends AutomatedTestBase double[][] Y = { {1, 1, 1}, {2, 2, 2}, - {3, 3, 1}, {4, 4, 2}, - {5, 5, 1} + {5, 5, 1}, + {3, 3, 1} }; - runRaGroupbyTest(X, select_col, Y); + runRaGroupbyTest(X, select_col, Y, method); } - @Test - public void testRaGroupbyTestwithOneGroup() { + public void testRaGroupbyTestwithOneGroup(String method) { //generate actual dataset and variables double[][] X = { {1, 2, 3, 8, 2}, @@ -122,10 +158,10 @@ public class BuiltinRaGroupbyTest extends AutomatedTestBase {8, 1, 2, 3, 2, 4, 7, 8, 3, 1, 3, 6, 4, 4, 7, 8, 5, 4, 8, 9, 6}, }; - runRaGroupbyTest(X, select_col, Y); + runRaGroupbyTest(X, select_col, Y, method); } - private void runRaGroupbyTest(double [][] X, int col, double [][] Y) + private void runRaGroupbyTest(double [][] X, int col, double [][] Y, String method) { ExecMode platformOld = setExecMode(ExecMode.SINGLE_NODE); @@ -135,16 +171,16 @@ public class BuiltinRaGroupbyTest extends AutomatedTestBase String HOME = SCRIPT_DIR + TEST_DIR; fullDMLScriptName = HOME + TEST_NAME + ".dml"; + + //test groupby methods programArgs = new String[]{"-stats", "-args", - input("X"), String.valueOf(col), output("result") }; - System.out.println(Arrays.deepToString(X)); - System.out.println(col); + input("X"), String.valueOf(col), method, output("result") }; + //fullRScriptName = HOME + TEST_NAME + ".R"; //rCmd = "Rscript" + " " + fullRScriptName + " " // + inputDir() + " " + col + " " + expectedDir(); writeInputMatrixWithMTD("X", X, true); - System.out.println(Arrays.deepToString(X)); //writeExpectedMatrix("result", Y); // run dmlScript and RScript diff --git a/src/test/scripts/functions/builtin/raGroupby.dml b/src/test/scripts/functions/builtin/raGroupby.dml index b93f9add14..4676e186a1 100644 --- a/src/test/scripts/functions/builtin/raGroupby.dml +++ b/src/test/scripts/functions/builtin/raGroupby.dml @@ -22,7 +22,6 @@ X = read($1) col = as.integer($2) -result = raGroupby(X, col, "nested-loop"); -write(result, $3); -print(toString(result)) +result = raGroupby(X, col, $3); +write(result, $4);