This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new f81b76de74 [SYSTEMDS-3708] Additional sort-merge raJoin method
f81b76de74 is described below
commit f81b76de742b1815f949c9d2aa9a13eb752499f2
Author: gghsu <[email protected]>
AuthorDate: Sat Jul 6 20:13:19 2024 +0200
[SYSTEMDS-3708] Additional sort-merge raJoin method
LDE project SoSe'24, part III.
Closes 2044.
---
scripts/builtin/raJoin.dml | 118 ++++++++++++++++++---
.../functions/builtin/part2/BuiltinRaJoinTest.java | 6 +-
src/test/scripts/functions/builtin/raJoin.dml | 2 +-
3 files changed, 111 insertions(+), 15 deletions(-)
diff --git a/scripts/builtin/raJoin.dml b/scripts/builtin/raJoin.dml
index 333a1f3e8d..b7b299e7a8 100644
--- a/scripts/builtin/raJoin.dml
+++ b/scripts/builtin/raJoin.dml
@@ -28,7 +28,7 @@
# colA Integer indicating the column index of matrix A to execute inner
join command
# B Matrix of right left data [shape: N x M]
# colA Integer indicating the column index of matrix B to execute inner
join command
-# method Join implementation method (nested-loop)
+# method Join implementation method (nested-loop, sort-merge)
#
------------------------------------------------------------------------------
#
# OUTPUT:
@@ -37,21 +37,115 @@
#
------------------------------------------------------------------------------
m_raJoin = function (Matrix[Double] A, Integer colA, Matrix[Double] B,
- Integer colB, String method="nested-loop")
+ Integer colB, String method="sort-merge")
return (Matrix[Double] Y)
{
- # matrix of result data
- Y = matrix(0, rows=0, cols=ncol(A) + ncol(B) )
-
- for (i in 1:nrow(A)) {
- for (j in 1:nrow(B)) {
- if (as.scalar(A[i, colA] == B[j, colB])) {
- # Combine the matching row from A and B to match
- match = cbind(A[i,], B[j,])
- # merge the match row into result Y
- Y = rbind(Y, match)
+ # Sort the input Matrix with specific column in order to ensure same output
order
+ A = order(target = A, by = colA, decreasing=FALSE, index.return=FALSE)
+ B = order(target = B, by = colB, decreasing=FALSE, index.return=FALSE)
+
+ if (method == "nested-loop") {
+ # matrix of result data
+ Y = matrix(0, rows=0, cols=ncol(A) + ncol(B) )
+
+ for (i in 1:nrow(A)) {
+ for (j in 1:nrow(B)) {
+ if (as.scalar(A[i, colA] == B[j, colB])) {
+ # Combine the matching row from A and B to match
+ match = cbind(A[i,], B[j,])
+ # merge the match row into result Y
+ Y = rbind(Y, match)
+ }
+ }
+ }
+ }
+ else if (method == "sort-merge") {
+ # get join key columns
+ left = A[, colA]
+ right = B[, colB]
+
+ # Sort join keys
+ leftIdx = order(target = left, decreasing=FALSE)
+ rightIdx = order(target = right, decreasing=FALSE)
+
+ # Ensure histograms are aligned by creating a common set of keys
+ commonKeys = max(max(left), max(right));
+
+ # Build histograms for the left and right key columns
+ leftHist = table(left, 1, commonKeys, 1)
+ rightHist = table(right, 1, commonKeys, 1)
+
+ # Compute the number of rows for each pair of matching keys
+ histMul = leftHist * rightHist
+
+ # Compute the prefx sums of histograms
+ cumLeftHist = cumsum(leftHist)
+ cumRightHist = cumsum(rightHist)
+ cumHistMul = cumsum(histMul)
+
+ # Initialize the output size and output offsets
+ outSize = cumHistMul[nrow(cumHistMul), 1]
+ if(as.scalar(outSize > 0)) {
+ offset = seq(1, as.scalar(outSize), 1)
+
+ # Find the bucket of matching keys to which each output belongs
+ outBucket = parallelBinarySearch(offset, cumHistMul)
+
+ # Determine the number of rows in outBucket
+ num_rows = nrow(outBucket)
+
+ # Initialize a matrix to store the result
+ updatedoffset = matrix(0, rows=num_rows, cols=1)
+ leftOutIdx = matrix(0, rows=num_rows, cols=1)
+ rightOutIdx = matrix(0, rows=num_rows, cols=1)
+
+ # Compute the element-wise subtraction and store in result
+ # TODO performance - try avoid iterating over rows
+ for(i in 1:num_rows) {
+ updatedoffset[i, 1] = offset[i, 1] -
(cumHistMul[as.scalar(outBucket[i, 1]), 1] - histMul[as.scalar(outBucket[i,
1]), 1]) -1
+ leftOutIdx[i, 1] = as.scalar(cumLeftHist[as.scalar(outBucket[i, 1]),
1] - leftHist[as.scalar(outBucket[i, 1]), 1] + floor(updatedoffset[i, 1] /
rightHist[as.scalar(outBucket[i, 1]), 1])) +1
+ rightOutIdx[i, 1] = as.scalar(cumRightHist[as.scalar(outBucket[i, 1]),
1] - rightHist[as.scalar(outBucket[i, 1]), 1] + (updatedoffset[i, 1] %%
rightHist[as.scalar(outBucket[i, 1]), 1])) +1
}
+
+ nrows = length(offset)
+ ncolsA = ncol(A)
+ ncolsB = ncol(B)
+ Y = matrix(0, rows=nrows, cols=ncolsA + ncolsB)
+
+ # Populate the output matrix Y
+ for (i in 1:nrows) {
+ Y[i, 1:ncolsA] = A[as.scalar(leftOutIdx[i, 1]), ]
+ Y[i, (ncolsA + 1):(ncolsA + ncolsB)] = B[as.scalar(rightOutIdx[i, 1]),
]
+ }
+ }
+ # TODO hash-based method which constructs permutation tables to replicate
+ # tuples of lhs and rhs and simply concatenates these tuples via cbind
+ else{
+ Y = matrix(0, rows=0, cols=1)
}
}
}
+# Function to perform parallel binary search
+parallelBinarySearch = function (Matrix[double] offset, Matrix[double]
cumHistMul)
+ return (Matrix[double] matched_result)
+{
+ n = nrow(cumHistMul)
+ result = matrix(0, rows=nrow(offset), cols=1)
+ for (i in 1:nrow(offset)) {
+ low = 1
+ high = n
+ while (low <= high) {
+ mid = as.integer((low + high) / 2)
+ if ( as.scalar(offset[i] <= cumHistMul[mid]) ) {
+ result[i] = mid
+ high = mid - 1
+ } else {
+ low = mid + 1
+ }
+ }
+ }
+
+ matched_result = result
+}
+
diff --git
a/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaJoinTest.java
b/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaJoinTest.java
index 6c5ea9d8ac..f19262b854 100644
---
a/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaJoinTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaJoinTest.java
@@ -41,6 +41,8 @@ public class BuiltinRaJoinTest extends AutomatedTestBase
addTestConfiguration(TEST_NAME,new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME,new String[]{"result"}));
}
+ // TODO test all join methods
+
@Test
public void testRaJoinTest() {
//generate actual dataset and variables
@@ -64,9 +66,9 @@ public class BuiltinRaJoinTest extends AutomatedTestBase
// Expected output matrix
double[][] Y = {
{1, 2, 3, 1, 2, 9},
+ {1, 3, 6, 1, 2, 9},
{4, 7, 8, 4, 7, 8},
{4, 7, 8, 4, 5, 10},
- {1, 3, 6, 1, 2, 9},
{4, 3, 5, 4, 7, 8},
{4, 3, 5, 4, 5, 10},
};
@@ -107,7 +109,7 @@ public class BuiltinRaJoinTest extends AutomatedTestBase
double[][] A = {
{1, 2, 3, 4, 5},
{6, 7, 8, 9, 10},
- {11, 12, 13, 14, 15},
+ {11, 12, 13, 14, 8},
{16, 17, 18, 19, 20},
{21, 22, 23, 24, 25}
};
diff --git a/src/test/scripts/functions/builtin/raJoin.dml
b/src/test/scripts/functions/builtin/raJoin.dml
index 08483b8ea8..63aa2807c8 100644
--- a/src/test/scripts/functions/builtin/raJoin.dml
+++ b/src/test/scripts/functions/builtin/raJoin.dml
@@ -24,6 +24,6 @@ colA = as.integer($2)
B = read($3)
colB = as.integer($4)
-result = raJoin(A, colA, B, colB, "nested-loop");
+result = raJoin(A, colA, B, colB, "sort-merge");
write(result, $5);