This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 8bed176bd7 [SYSTEMDS-3859] Improved Relational Algebra Builtin
Functions
8bed176bd7 is described below
commit 8bed176bd7d5c8008c4e3f3a527ef0510f232834
Author: Max Rankl <[email protected]>
AuthorDate: Thu Aug 28 14:53:26 2025 +0200
[SYSTEMDS-3859] Improved Relational Algebra Builtin Functions
Closes #2284.
---
scripts/builtin/raGroupby.dml | 286 +++++++++++++++++++++++++++++-------------
1 file changed, 199 insertions(+), 87 deletions(-)
diff --git a/scripts/builtin/raGroupby.dml b/scripts/builtin/raGroupby.dml
index 0a23bf51ef..4128bcc2b9 100644
--- a/scripts/builtin/raGroupby.dml
+++ b/scripts/builtin/raGroupby.dml
@@ -37,117 +37,229 @@
m_raGroupby = function (Matrix[Double] X, Integer col, String method)
return (Matrix[Double] Y)
{
- if (method == "nested-loop") {
- # Extract and sort unique values from the specified column (1-based index)
- uniqueValues = unique(X[, col])
- order_uniqueValues = order(target = uniqueValues, by = 1);
+ if (method == "nested-loop") {
+ # Extract and sort unique group values from the specified column (1-based
index)
+ groupsUnique = unique(X[, col])
+ groupsUniqueOrdered = order(target = groupsUnique, by = 1)
+ numGroups = nrow(groupsUnique)
+ maxRowsInGroup = max(table(X[,col],1));
- # Calcute the number of groups
- numGroups = nrow(uniqueValues)
+ # Define a zero output matrix, save the initial order of the groups, and
sort increasingly
+ Y = matrix(0, numGroups, maxRowsInGroup*(ncol(X) - 1) + 1)
+ Y[,1] = groupsUnique
+ indicesY = order(target = Y, by = 1, index.return = TRUE)
+ Y = order(target = Y, by = 1, decreasing = FALSE, index.return = FALSE)
- # Determine the maximum number of rows in any group
- maxRowsInGroup = max(table(X[,col],1));
+ # Order the input matrix by the grouping column
+ indicesX = order(target = X, by = col, index.return = TRUE)
+ X = order(target = X, by = col, decreasing = FALSE, index.return = FALSE)
+
+ currentGroupX = 1
+ currentGroupY = 1
+ i = 1
+
+ # Iterate over the input matrix
+ while (numGroups > 0) {
+ currentGroup = as.scalar(Y[currentGroupX,1])
+ nRowsToCopy = 0
+
+ # Find the rows for the current group
+ group = 1
+ while (group > 0) {
+ # Break if there are no more rows left in X
+ if (i > nrow(X)) {
+ group = 0
+ }
+ # Check if the row belongs to the current group
+ else if (as.scalar(X[i, col]) == currentGroup) {
+ nRowsToCopy = nRowsToCopy + 1
+ i = i + 1
+ }
+ # Break if the row does not belong to the current group
+ else {
+ group = 0
+ }
+ }
+
+ # Copy the values into the output matrix
+ if (nRowsToCopy > 0) {
+ nRowsCurrentGroup = currentGroupY + nRowsToCopy - 1
+
+ # 1. Grouping column is the first column
+ if (col == 1) {
+ newMatrix = X[currentGroupY:nRowsCurrentGroup, (col+1):ncol(X)]
+ }
+ # 2. Grouping column is the last column
+ else if (col == ncol(X)) {
+ newMatrix = X [currentGroupY:nRowsCurrentGroup, 1:col-1]
+ }
+ # 3. Grouping column has an intermediate position
+ else {
+ newMatrix = cbind(X[currentGroupY:nRowsCurrentGroup,
1:(col-1)], X[currentGroupY:nRowsCurrentGroup, (col+1):ncol(X)])
+ }
+
+ # Flatten the new row
+ newRow = matrix(newMatrix, rows = 1, cols = nrow(newMatrix) *
ncol(newMatrix))
+ newRowColIdx = nRowsToCopy * (ncol(X)-1)
- # Define a zero matrix to put the group data into
- Y = matrix(0,numGroups,maxRowsInGroup*(ncol(X)-1)+1)
-
- # Put the ordered uniqueValues into first column of Y as group_id
- #Y[,1] = order_uniqueValues
- Y[,1] = uniqueValues
-
- # Loop for each group
- for(i in 1:numGroups){
- index = 0
-
- # Iterate each row in matrix X to deal with group data
- for ( j in 1:nrow(X) ) {
- if ( as.scalar( X[j,col] == uniqueValues[i,1] )) {
- # Define the formula of the start and end column position
- startCol = index*(ncol(X)-1) +2
- endCol = startCol + (ncol(X)-2)
-
- if (col == 1) {
- # Case when the selected column is the first column
- Y[i,startCol:endCol] = X[j,2:ncol(X)]
- }
- else if (col == ncol(X)) {
- # Case when the selected column is the last column
- Y[i,startCol:endCol] = X[j,1:(ncol(X)-1)]
- }
- else {
- # General case
- newRow = cbind(X[j, 1:(col-1)], X[j, (col+1):ncol(X)])
- Y[i,startCol:endCol] = newRow
- }
- index = index +1
+ # Add the new row into Y at the current group
+ Y[currentGroupX, 2: (newRowColIdx + 1)] = newRow
}
- }
+
+ # Continue with the next group
+ currentGroupX = currentGroupX + 1
+ currentGroupY = currentGroupY + nRowsToCopy
+ numGroups = numGroups - 1
}
+
+ # Restore the initial order of X
+ X = cbind(X, indicesX)
+ nColX = ncol(X)
+ X = order(target = X, by= nColX)
+ X = X[, 1:nColX-1]
+
+ # Restore the initial order of Y
+ Y = cbind(Y, indicesY)
+ nColY = ncol(Y)
+ Y = order(target = Y, by= nColY)
+ Y = Y[, 1:nColY-1]
}
+
else if (method == "permutation-matrix") {
# Extract the grouping column and create unique groups
key = X[,col]
- key_unique = unique(X[, col])
- numGroups = nrow(key_unique)
+ keyUnique = unique(X[, col])
+ numGroups = nrow(keyUnique)
+ maxRowsInGroup = max(table(X[,col],1))
- # Matrix for comparison
- key_compare = key_unique %*% matrix(1, rows=1, cols=nrow(X))
- key_matrix = matrix(1, rows=nrow(key_unique), cols=1) %*% t(key)
+ # Calculate the frequency of each group
+ freqPerKey = table(key, 1)
+ freqPerKey = removeEmpty(target = freqPerKey, margin = "rows")
+ freqPerKeyIndices = order(target = keyUnique, by = 1, index.return = TRUE)
- # Find group index
- groupIndex = rowIndexMax(t(key_compare == key_matrix))
+ # Match the length of freqPerKey to keyUnique and sort it accordingly
+ freqPerKey = cbind(freqPerKey, freqPerKeyIndices)
+ nColFpk = ncol(freqPerKey)
+ freqPerKey = order(target = freqPerKey, by= nColFpk)
+ freqPerKey = freqPerKey[, 1:nColFpk-1]
+ freqPerKey = t(freqPerKey)
- # Determine the maximum number of rows in any group
- maxRowsInGroup = max(table(X[,col],1))
- totalCells = (maxRowsInGroup) * (ncol(X)-1) +1
+ # Find the group with the most values
+ groupMaxVal = maxRowsInGroup*(ncol(X)-1)+1
+ groupMaxValKey = max(freqPerKey)
+
+ # Calculate the amount of rows that need padding and the amount of padding
per key
+ groupMaxValKeySeq = matrix(groupMaxValKey, nrow(freqPerKey),
ncol(freqPerKey))
+ missingPadding = groupMaxValKeySeq - freqPerKey
+ amountOfZeroRows = sum(missingPadding)
+
+ # 1. Padding is required
+ if (amountOfZeroRows > 0) {
+ missingPadding = t(missingPadding)
- # Create permutation matrix P copy relevant tuples with a single matrix
multiplication
- P = matrix(0, rows=nrow(X), cols=numGroups * maxRowsInGroup)
- # Create offsets to store the first column of each group
- offsets = matrix(seq(0, (numGroups-1)*maxRowsInGroup, maxRowsInGroup),
rows=numGroups, cols=1)
+ # Remove the keys that dont need padding
+ removeMask = (missingPadding != 0)
+ missingPadding = cbind(keyUnique, missingPadding)
+ missingPadding = removeEmpty(target = missingPadding, margin = "rows",
select = removeMask)
- # Create row and column index for the permutation matrix
- rowIndex = seq(1, nrow(X))
- indexWithInGroups = cumsum(t(table(groupIndex, seq(1, nrow(X)), numGroups,
nrow(X))))
- selectedMatrix = table(seq(1, nrow(indexWithInGroups)), groupIndex)
- colIndex = groupIndex * maxRowsInGroup - maxRowsInGroup +
rowSums(indexWithInGroups * selectedMatrix)
+ # Keys that need padding and padding length per group
+ keysPadding = missingPadding[,1]
+ missingPadding = missingPadding[,2]
+ repeatKeys = matrix(0, rows=amountOfZeroRows, cols=1)
- # Set values in P
- P = table(seq(1, nrow(X)), colIndex)
+ # Generate the repeating keys
+ repeatKeysIdxS = 1
- # Perform matrix multiplication
- Y_temp = t(P) %*% X
+ for (i in 1:nrow(missingPadding)) {
+ repeat_count = as.scalar(missingPadding[i,1])
+ if (repeat_count > 0) {
+ temp = matrix(as.scalar(keysPadding[i, 1]), rows=repeat_count,
cols = 1)
+ repeatKeysIdxE = repeatKeysIdxS + repeat_count - 1
+ repeatKeys[repeatKeysIdxS:repeatKeysIdxE, 1] = temp
+ repeatKeysIdxS = repeatKeysIdxE + 1
+ }
+ }
+
+ # Combine the keys that need padding with the actual padding
+ padding = matrix(0, rows = nrow(repeatKeys), cols = 1)
+ padding = cbind(repeatKeys, padding)
+
+ # Extend the existing keys to a second column to match the padded keys
+ key = key %*% matrix(1, rows = 1, cols = 2)
+
+ # Combine the keys with the padded keys and sort them increasingly
+ tempY = rbind(key, padding)
+ tempY = order(target = tempY, by = 1, decreasing = FALSE, index.return
= FALSE)
+
+ # Remove the padded rows and save the Indices of the combined keys for
the permutation matrix
+ paddedRows = tempY[, 2]
+ tempIndicesY = order(target = tempY, by = 1, decreasing = FALSE,
index.return = TRUE)
+ tempIndicesY = removeEmpty(target = tempIndicesY, margin = "rows",
select = (paddedRows!=0))
- # Remove the selected column from Y_temp
- if( col == 1 ) {
- Y_temp_reduce = Y_temp[, col+1:ncol(Y_temp)]
+ # Create the permutation matrix by using the Indices of the combined
keys
+ P = table(seq(1, nrow(X)), tempIndicesY)
+
+ # Order the initial matrix to match the sorted keys with padding
+ indicesX = order(target = X, by = col, index.return = TRUE)
+ X = order(target = X, by = col, decreasing = FALSE, index.return =
FALSE)
+ X = order(target = X, by = col, decreasing = FALSE, index.return =
FALSE)
+
+ # Perform the matrix multiplication
+ tempY = t(P) %*% X
}
- else if( col == ncol(X) ) {
- Y_temp_reduce = Y_temp[, 1:col-1]
+
+ # 2. Padding is not required
+ else {
+ tempY = X
+ tempY = order(target = tempY, by = col, decreasing = FALSE,
index.return = FALSE)
}
- else{
- Y_temp_reduce = cbind(Y_temp[, 1:col-1],Y_temp[, col+1:ncol(Y_temp)])
+
+ # Remove the selected column from tempY
+ if (col == 1) {
+ tempY = tempY[, col+1:ncol(tempY)]
+ }
+ else if (col == ncol(X)) {
+ tempY = tempY[, 1:col-1]
+ }
+ else {
+ tempY = cbind(tempY[, 1:col-1],tempY[, col+1:ncol(tempY)])
}
- # Set value of final output
- Y = matrix(0, rows=numGroups, cols=totalCells)
- Y[,1] = key_unique
+ # Set the value of the final output
+ Y = matrix(0, rows=numGroups, cols=groupMaxVal)
+ Y[,1] = keyUnique
- # The permutation matrix creates a structure where each group's data
- # may not fill exactly maxRowsInGroup rows.
- # If needed, we need to pad to the expected size first.
+ # Each group's data may not fill exactly maxRowsInGroup rows
+ # If needed, we need to pad to the expected size first
expectedRows = numGroups * maxRowsInGroup
- actualRows = nrow(Y_temp_reduce)
-
- if(actualRows < expectedRows) {
- # Pad Y_temp_reduce with zeros to match expected structure
- Y_tmp_padded = matrix(0, rows=expectedRows, cols=ncol(Y_temp_reduce))
- Y_tmp_padded[1:actualRows,] = Y_temp_reduce
- } else {
- Y_tmp_padded = Y_temp_reduce
+ actualRows = nrow(tempY)
+
+ if (actualRows < expectedRows) {
+ # Pad tempY with zeros to match expected structure
+ tempYPadded = matrix(0, rows=expectedRows, cols=ncol(tempY))
+ tempYPadded[1:actualRows,] = tempY
+ }
+ else {
+ tempYPadded = tempY
}
- Y[,2:ncol(Y)] = matrix(Y_tmp_padded, rows=numGroups, cols=totalCells-1)
+ # Save the initial order of the groups in Y and order Y to match the
sorted tempYPadded
+ indicesY = order(target = Y, by = 1, index.return = TRUE)
+ Y = order(target = Y, by = 1, decreasing = FALSE, index.return = FALSE)
+
+ # Copy the values into Y
+ Y[,2:ncol(Y)] = matrix(tempYPadded, rows=numGroups, cols=groupMaxVal-1)
+
+ # Restore the initial order of X
+ X = cbind(X, indicesX)
+ nColX = ncol(X)
+ X = order(target = X, by= nColX)
+ X = X[, 1:nColX-1]
+
+ # Restore the initial order of Y
+ Y = cbind(Y, indicesY)
+ nColY = ncol(Y)
+ Y = order(target = Y, by= nColY)
+ Y = Y[, 1:nColY-1]
}
}
-