This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new fb2447274d [SYSTEMDS-3708] Add permutation-matrix method to raGroupby
fb2447274d is described below
commit fb2447274dc6101f8e1eeec0a7697662b2a3f74a
Author: gghsu <[email protected]>
AuthorDate: Sat Jul 27 18:54:13 2024 +0200
[SYSTEMDS-3708] Add permutation-matrix method to raGroupby
Closes #2052.
---
scripts/builtin/raGroupby.dml | 135 +++++++++++++++------
scripts/builtin/raJoin.dml | 32 +++--
.../builtin/part2/BuiltinRaGroupbyTest.java | 76 +++++++++---
src/test/scripts/functions/builtin/raGroupby.dml | 5 +-
4 files changed, 167 insertions(+), 81 deletions(-)
diff --git a/scripts/builtin/raGroupby.dml b/scripts/builtin/raGroupby.dml
index 1499bc0fcf..7d7035c0ff 100644
--- a/scripts/builtin/raGroupby.dml
+++ b/scripts/builtin/raGroupby.dml
@@ -26,7 +26,7 @@
#
------------------------------------------------------------------------------
# X Matrix of input data [shape: N x M]
# col Integer indicating the column index to execute grupby command
-# method Groupby implemention method (nested-loop)
+# method Groupby implemention method (nested-loop, permutation-matrix)
#
------------------------------------------------------------------------------
#
# OUTPUT:
@@ -34,50 +34,105 @@
# Y Matrix of selected data [shape N' x M] with N' <= N
#
------------------------------------------------------------------------------
-m_raGroupby = function (Matrix[Double] X, Integer col, String
method="nested-loop")
+m_raGroupby = function (Matrix[Double] X, Integer col, String method)
return (Matrix[Double] Y)
{
- # Extract and sort unique values from the specified column (1-based index)
- uniqueValues = unique(X[, col])
- order_uniqueValues = order(target = uniqueValues, by = 1);
-
- # Calcute the number of groups
- numGroups = nrow(uniqueValues)
-
- # Determine the maximum number of rows in any group
- maxRowsInGroup = max(table(X[,col],1));
-
- # Define a zero matrix to put the group data into
- Y=matrix(0,numGroups,maxRowsInGroup*(ncol(X)-1)+1)
-
- # Put the ordered uniqueValues into first column of Y as group_id
- Y[,1] = order_uniqueValues
-
- # Loop for each group
- for(i in 1:numGroups){
- index = 0
-
- # Iterate each row in matrix X to deal with group data
- for ( j in 1:nrow(X) ) {
- if ( as.scalar( X[j,col] == order_uniqueValues[i,1] )) {
- # Define the formula of the start and end column position
- startCol = index*(ncol(X)-1) +2
- endCol = startCol + (ncol(X)-2)
-
- if (col == 1) {
- # Case when the selected column is the first column
- Y[i,startCol:endCol]=X[j,2:ncol(X)]
- } else if (col == ncol(X)) {
- # Case when the selected column is the last column
- Y[i,startCol:endCol]=X[j,1:(ncol(X)-1)]
- } else {
- # General case
- newRow = cbind(X[j, 1:(col-1)], X[j, (col+1):ncol(X)])
- Y[i,startCol:endCol]=newRow
+ if (method == "nested-loop") {
+ # Extract and sort unique values from the specified column (1-based index)
+ uniqueValues = unique(X[, col])
+ order_uniqueValues = order(target = uniqueValues, by = 1);
+
+ # Calcute the number of groups
+ numGroups = nrow(uniqueValues)
+
+ # Determine the maximum number of rows in any group
+ maxRowsInGroup = max(table(X[,col],1));
+
+ # Define a zero matrix to put the group data into
+ Y = matrix(0,numGroups,maxRowsInGroup*(ncol(X)-1)+1)
+
+ # Put the ordered uniqueValues into first column of Y as group_id
+ #Y[,1] = order_uniqueValues
+ Y[,1] = uniqueValues
+
+ # Loop for each group
+ for(i in 1:numGroups){
+ index = 0
+
+ # Iterate each row in matrix X to deal with group data
+ for ( j in 1:nrow(X) ) {
+ if ( as.scalar( X[j,col] == uniqueValues[i,1] )) {
+ # Define the formula of the start and end column position
+ startCol = index*(ncol(X)-1) +2
+ endCol = startCol + (ncol(X)-2)
+
+ if (col == 1) {
+ # Case when the selected column is the first column
+ Y[i,startCol:endCol] = X[j,2:ncol(X)]
+ }
+ else if (col == ncol(X)) {
+ # Case when the selected column is the last column
+ Y[i,startCol:endCol] = X[j,1:(ncol(X)-1)]
+ }
+ else {
+ # General case
+ newRow = cbind(X[j, 1:(col-1)], X[j, (col+1):ncol(X)])
+ Y[i,startCol:endCol] = newRow
+ }
+ index = index +1
}
- index = index +1
}
}
}
+ else if (method == "permutation-matrix") {
+ # Extract the grouping column and create unique groups
+ key = X[,col]
+ key_unique = unique(X[, col])
+ numGroups = nrow(key_unique)
+
+ # Matrix for comparison
+ key_compare = key_unique %*% matrix(1, rows=1, cols=nrow(X))
+ key_matrix = matrix(1, rows=nrow(key_unique), cols=1) %*% t(key)
+
+ # Find group index
+ groupIndex = rowIndexMax(t(key_compare == key_matrix))
+
+ # Determine the maximum number of rows in any group
+ maxRowsInGroup = max(table(X[,col],1))
+ totalCells = (maxRowsInGroup) * (ncol(X)-1) +1
+
+ # Create permutation matrix P copy relevant tuples with a single matrix
multiplication
+ P = matrix(0, rows=nrow(X), cols=numGroups * maxRowsInGroup)
+ # Create offsets to store the first column of each group
+ offsets = matrix(seq(0, (numGroups-1)*maxRowsInGroup, maxRowsInGroup),
rows=numGroups, cols=1)
+
+ # Create row and column index for the permutation matrix
+ rowIndex = seq(1, nrow(X))
+ indexWithInGroups = cumsum(t(table(groupIndex, seq(1, nrow(X)), numGroups,
nrow(X))))
+ selectedMatrix = table(seq(1, nrow(indexWithInGroups)), groupIndex)
+ colIndex = groupIndex * maxRowsInGroup - maxRowsInGroup +
rowSums(indexWithInGroups * selectedMatrix)
+
+ # Set values in P
+ P = table(seq(1, nrow(X)), colIndex)
+
+ # Perform matrix multiplication
+ Y_temp = t(P) %*% X
+
+ # Remove the selected column from Y_temp
+ if( col == 1 ) {
+ Y_temp_reduce = Y_temp[, col+1:ncol(Y_temp)]
+ }
+ else if( col == ncol(X) ) {
+ Y_temp_reduce = Y_temp[, 1:col-1]
+ }
+ else{
+ Y_temp_reduce = cbind(Y_temp[, 1:col-1],Y_temp[, col+1:ncol(Y_temp)])
+ }
+
+ # Set value of final output
+ Y = matrix(0, rows=numGroups, cols=totalCells)
+ Y[,1] = key_unique
+ Y[,2:ncol(Y)] = matrix(Y_temp_reduce, rows=numGroups, cols=totalCells-1)
+ }
}
diff --git a/scripts/builtin/raJoin.dml b/scripts/builtin/raJoin.dml
index bddf94c019..2750b598f3 100644
--- a/scripts/builtin/raJoin.dml
+++ b/scripts/builtin/raJoin.dml
@@ -28,7 +28,7 @@
# colA Integer indicating the column index of matrix A to execute inner
join command
# B Matrix of right left data [shape: N x M]
# colA Integer indicating the column index of matrix B to execute inner
join command
-# method Join implementation method (nested-loop, sort-merge)
+# method Join implementation method (nested-loop, sort-merge, hash)
#
------------------------------------------------------------------------------
#
# OUTPUT:
@@ -37,7 +37,7 @@
#
------------------------------------------------------------------------------
m_raJoin = function (Matrix[Double] A, Integer colA, Matrix[Double] B,
- Integer colB, String method="sort-merge")
+ Integer colB, String method)
return (Matrix[Double] Y)
{
# Sort the input Matrix with specific column in order to ensure same output
order
@@ -65,8 +65,8 @@ m_raJoin = function (Matrix[Double] A, Integer colA,
Matrix[Double] B,
right = B[, colB]
# Sort join keys
- leftIdx = order(target = left, decreasing=FALSE)
- rightIdx = order(target = right, decreasing=FALSE)
+ leftIdx = seq(1, nrow(A))
+ rightIdx = seq(1, nrow(B))
# Ensure histograms are aligned by creating a common set of keys
commonKeys = max(max(left), max(right));
@@ -94,18 +94,15 @@ m_raJoin = function (Matrix[Double] A, Integer colA,
Matrix[Double] B,
# Determine the number of rows in outBucket
num_rows = nrow(outBucket)
- # Initialize a matrix to store the result
- updatedoffset = matrix(0, rows=num_rows, cols=1)
- leftOutIdx = matrix(0, rows=num_rows, cols=1)
- rightOutIdx = matrix(0, rows=num_rows, cols=1)
-
# Compute the element-wise subtraction and store in result
# TODO performance - try avoid iterating over rows
- for(i in 1:num_rows) {
- updatedoffset[i, 1] = offset[i, 1] -
(cumHistMul[as.scalar(outBucket[i, 1]), 1] - histMul[as.scalar(outBucket[i,
1]), 1]) -1
- leftOutIdx[i, 1] = as.scalar(cumLeftHist[as.scalar(outBucket[i, 1]),
1] - leftHist[as.scalar(outBucket[i, 1]), 1] + floor(updatedoffset[i, 1] /
rightHist[as.scalar(outBucket[i, 1]), 1])) +1
- rightOutIdx[i, 1] = as.scalar(cumRightHist[as.scalar(outBucket[i, 1]),
1] - rightHist[as.scalar(outBucket[i, 1]), 1] + (updatedoffset[i, 1] %%
rightHist[as.scalar(outBucket[i, 1]), 1])) +1
- }
+ # create a mask to apply the outBucket value as an index of following
matrix
+ seqMatrix = matrix(1, rows=nrow(outBucket), cols=1) %*% t(seq(1,
nrow(cumHistMul)))
+ mask = (outBucket == seqMatrix)
+
+ updatedoffset = offset - (mask %*% (cumHistMul - histMul)) - 1
+ leftOutIdx = mask %*% (cumLeftHist - leftHist) + (floor(updatedoffset /
mask %*% rightHist)) + 1
+ rightOutIdx = mask %*% (cumRightHist - rightHist) + (updatedoffset %%
(mask %*% rightHist)) + 1
nrows = length(offset)
ncolsA = ncol(A)
@@ -118,8 +115,6 @@ m_raJoin = function (Matrix[Double] A, Integer colA,
Matrix[Double] B,
Y[i, (ncolsA + 1):(ncolsA + ncolsB)] = B[as.scalar(rightOutIdx[i, 1]),
]
}
}
- # TODO hash-based method which constructs permutation tables to replicate
- # tuples of lhs and rhs and simply concatenates these tuples via cbind
else{
Y = matrix(0, rows=0, cols=1)
}
@@ -152,7 +147,7 @@ m_raJoin = function (Matrix[Double] A, Integer colA,
Matrix[Double] B,
# Function to perform parallel binary search
parallelBinarySearch = function (Matrix[double] offset, Matrix[double]
cumHistMul)
- return (Matrix[double] matched_result)
+return (Matrix[double] matched_result)
{
n = nrow(cumHistMul)
result = matrix(0, rows=nrow(offset), cols=1)
@@ -164,7 +159,8 @@ parallelBinarySearch = function (Matrix[double] offset,
Matrix[double] cumHistMu
if ( as.scalar(offset[i] <= cumHistMul[mid]) ) {
result[i] = mid
high = mid - 1
- } else {
+ }
+ else {
low = mid + 1
}
}
diff --git
a/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaGroupbyTest.java
b/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaGroupbyTest.java
index 6db3c46d3b..770f8c29e4 100644
---
a/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaGroupbyTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaGroupbyTest.java
@@ -42,7 +42,46 @@ public class BuiltinRaGroupbyTest extends AutomatedTestBase
}
@Test
- public void testRaGroupbyTest() {
+ public void testRaGroupbyTest1() {
+ testRaGroupbyTest("nested-loop");
+ }
+
+ @Test
+ public void testRaGroupbyTest2() {
+ testRaGroupbyTest("permutation-matrix");
+ }
+
+ @Test
+ public void testRaGroupbyTestwithDifferentColumn1() {
+ testRaGroupbyTestwithDifferentColumn("nested-loop");
+ }
+
+ @Test
+ public void testRaGroupbyTestwithDifferentColumn2() {
+ testRaGroupbyTestwithDifferentColumn("permutation-matrix");
+ }
+
+ @Test
+ public void testRaGroupbyTestwithNoGroup1() {
+ testRaGroupbyTestwithNoGroup("nested-loop");
+ }
+
+ @Test
+ public void testRaGroupbyTestwithNoGroup2() {
+ testRaGroupbyTestwithNoGroup("permutation-matrix");
+ }
+
+ @Test
+ public void testRaGroupbyTestwithOneGroup1() {
+ testRaGroupbyTestwithOneGroup("nested-loop");
+ }
+
+ @Test
+ public void testRaGroupbyTestwithOneGroup2() {
+ testRaGroupbyTestwithOneGroup("permutation-matrix");
+ }
+
+ public void testRaGroupbyTest(String method) {
//generate actual dataset and variables
double[][] X = {
{1, 2, 3},
@@ -58,11 +97,10 @@ public class BuiltinRaGroupbyTest extends AutomatedTestBase
{4, 7, 8, 7, 8, 8, 9}
};
- runRaGroupbyTest(X, select_col, Y);
+ runRaGroupbyTest(X, select_col, Y, method);
}
- @Test
- public void testRaGroupbyTestwithDifferentColumn() {
+ public void testRaGroupbyTestwithDifferentColumn(String method) {
//generate actual dataset and variables
double[][] X = {
{1, 2, 3},
@@ -75,16 +113,15 @@ public class BuiltinRaGroupbyTest extends AutomatedTestBase
// Expected output matrix
double[][] Y = {
{2, 1, 3, 0, 0},
+ {8, 4, 9, 0, 0},
{3, 1, 6, 0, 0},
- {7, 4, 8, 4, 8},
- {8, 4, 9, 0, 0}
+ {7, 4, 8, 4, 8}
};
- runRaGroupbyTest(X, select_col, Y);
+ runRaGroupbyTest(X, select_col, Y, method);
}
- @Test
- public void testRaGroupbyTestwithNoGroup() {
+ public void testRaGroupbyTestwithNoGroup(String method) {
// Test case with different values in select_col
double[][] X = {
{1, 1, 1},
@@ -98,16 +135,15 @@ public class BuiltinRaGroupbyTest extends AutomatedTestBase
double[][] Y = {
{1, 1, 1},
{2, 2, 2},
- {3, 3, 1},
{4, 4, 2},
- {5, 5, 1}
+ {5, 5, 1},
+ {3, 3, 1}
};
- runRaGroupbyTest(X, select_col, Y);
+ runRaGroupbyTest(X, select_col, Y, method);
}
- @Test
- public void testRaGroupbyTestwithOneGroup() {
+ public void testRaGroupbyTestwithOneGroup(String method) {
//generate actual dataset and variables
double[][] X = {
{1, 2, 3, 8, 2},
@@ -122,10 +158,10 @@ public class BuiltinRaGroupbyTest extends
AutomatedTestBase
{8, 1, 2, 3, 2, 4, 7, 8, 3, 1, 3, 6, 4, 4, 7,
8, 5, 4, 8, 9, 6},
};
- runRaGroupbyTest(X, select_col, Y);
+ runRaGroupbyTest(X, select_col, Y, method);
}
- private void runRaGroupbyTest(double [][] X, int col, double [][] Y)
+ private void runRaGroupbyTest(double [][] X, int col, double [][] Y,
String method)
{
ExecMode platformOld = setExecMode(ExecMode.SINGLE_NODE);
@@ -135,16 +171,16 @@ public class BuiltinRaGroupbyTest extends
AutomatedTestBase
String HOME = SCRIPT_DIR + TEST_DIR;
fullDMLScriptName = HOME + TEST_NAME + ".dml";
+
+ //test groupby methods
programArgs = new String[]{"-stats", "-args",
- input("X"), String.valueOf(col),
output("result") };
- System.out.println(Arrays.deepToString(X));
- System.out.println(col);
+ input("X"), String.valueOf(col), method,
output("result") };
+
//fullRScriptName = HOME + TEST_NAME + ".R";
//rCmd = "Rscript" + " " + fullRScriptName + " "
// + inputDir() + " " + col + " " + expectedDir();
writeInputMatrixWithMTD("X", X, true);
- System.out.println(Arrays.deepToString(X));
//writeExpectedMatrix("result", Y);
// run dmlScript and RScript
diff --git a/src/test/scripts/functions/builtin/raGroupby.dml
b/src/test/scripts/functions/builtin/raGroupby.dml
index b93f9add14..4676e186a1 100644
--- a/src/test/scripts/functions/builtin/raGroupby.dml
+++ b/src/test/scripts/functions/builtin/raGroupby.dml
@@ -22,7 +22,6 @@
X = read($1)
col = as.integer($2)
-result = raGroupby(X, col, "nested-loop");
-write(result, $3);
-print(toString(result))
+result = raGroupby(X, col, $3);
+write(result, $4);