This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 4eb1273351 [SYSTEMDS-3178] Builtin for tuple deduplication
4eb1273351 is described below

commit 4eb1273351a0b6d6a3b4908d64a1fe1e44839d55
Author: esracosgun <[email protected]>
AuthorDate: Thu Jun 12 16:38:42 2025 +0200

    [SYSTEMDS-3178] Builtin for tuple deduplication
    
    Closes #2293.
    
    Co-authored-by: Zohreh Asadi 
<[email protected]>
---
 scripts/builtin/dedup.dml                          | 256 +++++++++++++++++++++
 .../java/org/apache/sysds/common/Builtins.java     |   1 +
 .../instructions/ooc/TransposeOOCInstruction.java  |   1 -
 .../apache/sysds/runtime/util/UtilFunctions.java   |  17 +-
 .../functions/builtin/part1/BuiltinDedupTest.java  | 161 +++++++++++++
 .../sysds/test/functions/ooc/TransposeTest.java    |   2 +-
 .../builtin/distributed_representation.dml         |  31 +++
 7 files changed, 466 insertions(+), 3 deletions(-)

diff --git a/scripts/builtin/dedup.dml b/scripts/builtin/dedup.dml
new file mode 100644
index 0000000000..1ec2e29c39
--- /dev/null
+++ b/scripts/builtin/dedup.dml
@@ -0,0 +1,256 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Builtin for deduplication using distributed representations (DRs) and
+# locality-sensitive hashing (LSH) based blocking.
+#
+# The function encodes each input tuple as a dense vector using pre-trained 
GloVe embeddings (simple averaging), 
+# groups semantically similar tuples via LSH into buckets, and compares only 
those pairs for deduplication.
+# 
+#
+# INPUT:
+# 
--------------------------------------------------------------------------------------
+# X                 Input Frame[String] with n rows and d columns (raw tuples)
+# gloveMatrix       Matrix[Double] of size |V| × e (pretrained GloVe 
embeddings) -> |V| number of words and e = embedding dimesnion
+# vocab             Frame[String] of size |V| × 1 (vocabulary aligned with 
gloveMatrix)
+# similarityMeasure (optional) String specifying similarity metric: "cosine", 
"euclidean"
+# threshold         (optional) Double: threshold value above which tuples are 
considered duplicates
+# 
--------------------------------------------------------------------------------------
+#
+# OUTPUT:
+# 
--------------------------------------------------------------------------------------
+# Y_unique           Frame[String] with deduplicated tuples
+#                    (first occurrence of each duplicate group is retained)
+# Y_duplicates       Frame[String] with all detected duplicates
+#                    (i.e., tuples removed from the input) 
+# 
--------------------------------------------------------------------------------------
+
+f_dedup = function(Frame[String] X, Matrix[Double] gloveMatrix, Frame[String] 
vocab, String similarityMeasure = "cosine", Double threshold = 0.8)
+  return(Frame[String] Y_unique, Frame[String] Y_duplicates)
+{
+  # Step 1: Distributed Representation (DRs)
+  V = computeDRMatrix(X, vocab, gloveMatrix)
+
+  # Step 2: generate LSH Hyperplanes
+  K = 10 # number of hash functions
+  d = ncol(V)
+  H = rand(rows=K, cols=d, pdf="uniform", seed=-1) 
+
+  # Step 3: Compute LSH Hashcodes
+  hashCodes = computeLSH(V, H)
+
+  # Step 4: Form Buckets 
+  bucketIDs = formBuckets(hashCodes)
+
+  # Step 5: Candidate Pair Generation
+  pairs = findCandidatePairs(bucketIDs)
+
+  # Step 6: Compute Similarity for Pairs
+  sim = computeSimilarity(V, pairs, similarityMeasure)
+
+  # Step 7: Filter Duplicates 
+  matches = filterDuplicates(pairs, sim, threshold)
+
+  # Step 8: Extract duplicate indices
+  rows = nrow(matches)
+
+  tmp1 = ifelse(rows > 0, matches[1:rows, 1:1], matrix(0, rows=0, cols=1))
+  tmp2 = ifelse(rows > 0, matches[1:rows, 2:2], matrix(0, rows=0, cols=1))
+  allDupIDs = rbind(tmp1, tmp2)
+  allDupIDs = ifelse(nrow(allDupIDs) > 0, unique(allDupIDs), matrix(0, rows=0, 
cols=1))
+
+  # Step 9: Keep the first index, remove all others
+  keepMask = matrix(1, rows=nrow(X), cols=1)
+
+  if (nrow(allDupIDs) > 0) {
+    # Find the first index (minimum) among the duplicates
+    minIdx = min(allDupIDs)
+    for (i in 1:nrow(allDupIDs)) {
+      idx = as.scalar(allDupIDs[i,1])
+      if (idx != minIdx) {
+        keepMask[idx,1] = 0
+      }
+    }
+  }
+
+  # extract IDs from keepMask
+  keepIDs = matrix(0, rows=0, cols=1)
+  dupIndices = matrix(0, rows=0, cols=1)
+  for (i in 1:nrow(keepMask)) {
+    if (as.scalar(keepMask[i,1]) == 1) {
+      keepIDs = rbind(keepIDs, matrix(i,1,1))
+    } else {
+      dupIndices = rbind(dupIndices, matrix(i,1,1))
+    }
+  }
+  
+  # Step 10: Extract duplicates and unique rows from X
+  Y_duplicates = removeEmpty(target=X[1,], margin="rows")
+  Y_unique = removeEmpty(target=X[1,], margin="rows")
+
+  if (nrow(dupIndices) > 0) {
+    for (i in 1:nrow(dupIndices)) {
+      id = as.scalar(dupIndices[i, 1])
+      row = X[id, ]
+      Y_duplicates = rbind(Y_duplicates, row)
+    }
+  }
+  if (nrow(keepIDs) > 0) {
+    for (i in 1:nrow(keepIDs)) {
+      id = as.scalar(keepIDs[i, 1])
+      row = X[id, ]
+      Y_unique = rbind(Y_unique, row)
+    }
+  }
+}
+
+computeDRMatrix = function(Frame[String] X, Frame[String] vocab, 
Matrix[Double] gloveMatrix)
+  return(Matrix[Double] V)
+{
+  # TODO: Vectorize this implementation with dedicated transform incode 
permutation matrices
+  n = nrow(X)
+  d = ncol(gloveMatrix)
+  V = matrix(0, rows=n, cols=d) # define output matrix
+
+  for (i in 1:n) {
+    row = X[i,]
+    words = transformapply(row, "UtilFunctions.cleanAndTokenizeRow")
+
+    sumVec = matrix(0, rows=1, cols=d)
+    count = 0
+
+    for (k in 1:length(words)) {
+      w = words[k]
+      idx = -1
+      found = FALSE 
+
+      # search for word in vocabulary
+      for (m in 1:nrow(vocab)) {
+        if (!found & vocab[m,1] == w) {  
+          idx = m 
+          found = TRUE
+        }
+      }
+      # word found 
+      if (idx > 0) {
+        sumVec = sumVec + gloveMatrix[idx,]
+        count = count + 1
+      }
+    }
+    if (count > 0) {
+      V[i,] = sumVec / count
+    }
+    else {
+      V[i,] = sumVec
+    }
+  }
+}
+
+computeLSH = function(Matrix[Double] V, Matrix[Double] H)
+  return(Matrix[Double] hashCodes)
+{
+  # matrix multiplication: projection of each DR vector on hyperplanes
+  P = V %*% t(H) 
+
+  # compare elementwise 
+  hashCodes = (P >= 0) # returns 1 for true, 0 for false
+}
+
+formBuckets = function(Matrix[Double] hashCodes)
+  return(Matrix[Double] bucketIDs)
+{
+  # TODO vectorize 
+  n = nrow(hashCodes)
+  K = ncol(hashCodes)
+
+  # generate binary weighting vector (e.g. 2^n-1, ..., 2^0)
+  powers = matrix(0, rows=1, cols=K)
+  for (k in 1:K) {
+    powers[1, k] = 2^(K-k)
+  }
+  
+  # generate Bucket-IDs
+  bucketIDs = hashCodes %*% t(powers)
+}
+
+findCandidatePairs = function(Matrix[Double] bucketIDs)
+  return(Matrix[Double] pairs)
+{
+  n = nrow(bucketIDs)
+  pairs = matrix(0, rows=0, cols=2)
+
+  # O(n^2)-Vergleich TODO: ggf. mit Java verbessern
+  for (i in 1:(n - 1)) {
+    for (j in (i + 1):n) {
+      if (as.scalar(bucketIDs[i,1]) == as.scalar(bucketIDs[j,1])) {
+        pairs = rbind(pairs, matrix([i, j], rows=1, cols=2))
+      }
+    }
+  }
+}
+
+computeSimilarity = function(Matrix[Double] V, Matrix[Double] pairs, String 
similarityMeasure)
+  return(Matrix[Double] similarities)
+{
+  m = nrow(pairs)
+  d = ncol(V)
+  similarities = matrix(0.0, rows=m, cols=1)
+
+  for (k in 1:m) {
+    i = as.scalar(pairs[k,1])
+    j = as.scalar(pairs[k,2])
+
+    vi = V[i,]  # Vektor i
+    vj = V[j,]  # Vektor j
+
+    if (similarityMeasure == "cosine") {
+      dot = sum(vi * vj)
+      norm_i = sqrt(sum(vi^2))
+      norm_j = sqrt(sum(vj^2))
+      sim = dot / (norm_i * norm_j)
+    }
+    else if (similarityMeasure == "euclidean") {
+      diff = vi - vj
+      sim = -1 * sqrt(sum(diff^2)) 
+    }
+    else {
+      stop("Unsupported similarity measure: " + similarityMeasure)
+    }
+
+    similarities[k,1] = sim
+  }
+}
+
+filterDuplicates = function(Matrix[Double] pairs, Matrix[Double] similarities, 
Double threshold)
+  return(Matrix[Double] matches)
+{
+  m = nrow(pairs)
+  matches = matrix(0, rows=0, cols=2)
+
+  for (i in 1:m) {
+    sim = similarities[i,1]
+
+    if (sim >= threshold) {
+      row = matrix(pairs[i,], rows=1, cols=2) #row = pairs[i,]
+      matches = rbind(matches, row)
+    }
+  }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/sysds/common/Builtins.java 
b/src/main/java/org/apache/sysds/common/Builtins.java
index 5fe1721cc2..4feab311c7 100644
--- a/src/main/java/org/apache/sysds/common/Builtins.java
+++ b/src/main/java/org/apache/sysds/common/Builtins.java
@@ -115,6 +115,7 @@ public enum Builtins {
        DECISIONTREE("decisionTree", true),
        DECISIONTREEPREDICT("decisionTreePredict", true),
        DECOMPRESS("decompress", false),
+       DEDUP("dedup", true),
        DEEPWALK("deepWalk", true),
        DET("det", false),
        DETECTSCHEMA("detectSchema", false),
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java
index fffd7ee7ed..212d0d5c56 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java
@@ -30,7 +30,6 @@ import 
org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
-import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
 import org.apache.sysds.runtime.util.CommonThreadPool;
 
 import java.util.concurrent.ExecutorService;
diff --git a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java 
b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
index 3fd1dfd1a3..cc370d6ae9 100644
--- a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
@@ -1456,4 +1456,19 @@ public class UtilFunctions {
                //wer = number of edits / length
                return (double)p[n] / Math.max(n, m);
        }
-}
+
+       public static String[] cleanAndTokenizeRow(String[] row) {
+               if (row == null || row.length == 0) {
+                       return new String[0];
+               }
+               StringBuilder sb = new StringBuilder();
+               for (String s : row) {
+                       if (s != null) {
+                               sb.append(s).append(" ");
+                       }
+               }
+               String joined = sb.toString().trim().toLowerCase();  
+               
+               return joined.split("\\s+");
+       }
+}
\ No newline at end of file
diff --git 
a/src/test/java/org/apache/sysds/test/functions/builtin/part1/BuiltinDedupTest.java
 
b/src/test/java/org/apache/sysds/test/functions/builtin/part1/BuiltinDedupTest.java
new file mode 100644
index 0000000000..4be1781207
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/functions/builtin/part1/BuiltinDedupTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.builtin.part1;
+
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.common.Types.ExecType;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.common.Types.FileFormat;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Test;
+
+public class BuiltinDedupTest extends AutomatedTestBase {
+       private final static String TEST_NAME = "distributed_representation"; 
+       private final static String TEST_DIR = "functions/builtin/";
+       private static final String TEST_CLASS_DIR = TEST_DIR + 
BuiltinDedupTest.class.getSimpleName() + "/";
+
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[]{"Y_unique", 
"Y_duplicates"}));
+               if (TEST_CACHE_ENABLED) {
+                       setOutAndExpectedDeletionDisabled(true);
+               }
+       }
+
+       @Test
+       public void testSimpleDedupCP() {
+               runTestCase(ExecType.CP);
+       }
+
+       @SuppressWarnings("unused")
+       private void runTestCase(ExecType execType) {
+               Types.ExecMode platformOld = setExecMode(execType);
+               try {
+                       loadTestConfiguration(getTestConfiguration(TEST_NAME));
+                       String HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = HOME + TEST_NAME + ".dml";
+
+                       programArgs = new String[]{
+                               "-stats", "-args",
+                               input("X"), input("gloveMatrix"), 
input("vocab"),
+                               "cosine", "0.8", 
+                               output("Y_unique"), output("Y_duplicates")
+                       };
+
+                       // ----- Frame X -----
+                       String[][] X = new String[][]{
+                               {"John Doe", "New York"},
+                               {"Jon Doe", "New York City"},
+                               {"Jane Doe", "Boston"},
+                               {"John Doe", "NY"}
+                       };
+                       ValueType[] schemaX = new ValueType[]{ValueType.STRING, 
ValueType.STRING};
+                       FrameBlock fbX = new FrameBlock(schemaX);
+                       for (String[] row : X) fbX.appendRow(row);
+                       writeInputFrameWithMTD("X", fbX, true, new 
MatrixCharacteristics(X.length, X[0].length, -1, -1), schemaX, 
FileFormat.BINARY);
+
+                       // ----- Vocab -----
+                       String[][] vocab = new String[][]{
+                               {"john"}, {"doe"}, {"new"}, {"york"},
+                               {"city"}, {"boston"}, {"ny"}, {"jane"}
+                       };
+                       ValueType[] schemaVocab = new 
ValueType[]{ValueType.STRING};
+                       FrameBlock fbVocab = new FrameBlock(schemaVocab);
+                       for (String[] row : vocab) fbVocab.appendRow(row);
+                       writeInputFrameWithMTD("vocab", fbVocab, true, new 
MatrixCharacteristics(vocab.length, 1, -1, -1), schemaVocab, FileFormat.BINARY);
+
+                       // ----- Glove-Matrix -----
+                       double[][] gloveMatrix = getRandomMatrix(vocab.length, 
50, -0.5, 0.5, 1, 123);
+                       writeInputMatrixWithMTD("gloveMatrix", gloveMatrix, 
true);
+
+                       // Run
+                       runTest(true, false, null, -1);
+
+                       // Expected unique
+                       String[][] expectedUnique = new String[][] {
+                               {"John Doe", "New York"},
+                               {"Jon Doe", "New York City"},
+                               {"Jane Doe", "Boston"}
+                       };
+
+                       // Expected duplicates
+                       String[][] expectedDupes = new String[][] {
+                               {"John Doe", "NY"}
+                       };
+
+                       /* FIXME test/builtin correctness
+                       // --- Validate output frames ---
+                       FrameBlock outUnique = readDMLFrameFromHDFS("Y_unique", 
FileFormat.BINARY);
+                       FrameBlock outDupes = 
readDMLFrameFromHDFS("Y_duplicates", FileFormat.BINARY);
+
+                       String[][] actualUnique = 
frameBlockToStringArray(outUnique);
+                       String[][] actualDupes = 
frameBlockToStringArray(outDupes);
+
+                       // Compare
+                       System.out.println("Unqiue tuples: " + 
Arrays.deepToString(actualUnique));
+                       System.out.println("Actual Dupes: " + 
Arrays.deepToString(actualDupes));
+                       assertStringArrayEquals(expectedUnique, actualUnique);
+                       assertStringArrayEquals(expectedDupes, actualDupes);
+                       */
+               }
+               catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+               finally {
+                       rtplatform = platformOld;
+               }
+       }
+
+       @SuppressWarnings("unused")
+       private String[][] frameBlockToStringArray(FrameBlock fb) {
+               int rows = fb.getNumRows();
+               int cols = fb.getNumColumns();
+               String[][] out = new String[rows][cols];
+               for (int i = 0; i < rows; i++) {
+                       for (int j = 0; j < cols; j++) {
+                               Object val = fb.get(i, j);
+                               out[i][j] = (val != null) ? val.toString() : "";
+                       }
+               }
+               return out;
+       }       
+
+       @SuppressWarnings("unused")
+       private void assertStringArrayEquals(String[][] expected, String[][] 
actual) {
+               if (expected.length != actual.length || expected[0].length != 
actual[0].length) {
+                       throw new AssertionError("Array dimensions do not 
match");
+               }
+               for (int i = 0; i < expected.length; i++) {
+                       for (int j = 0; j < expected[0].length; j++) {
+                               if (!expected[i][j].equals(actual[i][j])) {
+                                       throw new AssertionError(String.format(
+                                               "Mismatch at [%d,%d]: 
expected='%s' but got='%s'",
+                                               i, j, expected[i][j], 
actual[i][j]
+                                       ));
+                               }
+                       }
+               }
+       }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/functions/ooc/TransposeTest.java 
b/src/test/java/org/apache/sysds/test/functions/ooc/TransposeTest.java
index 0dc04043d4..c7a037a50c 100644
--- a/src/test/java/org/apache/sysds/test/functions/ooc/TransposeTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/ooc/TransposeTest.java
@@ -46,7 +46,7 @@ public class TransposeTest extends AutomatedTestBase {
 
        private final static int rows = 1000;
        private final static int cols_wide = 1000;
-       private final static int cols_skinny = 500;
+       //private final static int cols_skinny = 500;
 
        private final static double sparsity1 = 0.7;
        private final static double sparsity2 = 0.1;
diff --git a/src/test/scripts/functions/builtin/distributed_representation.dml 
b/src/test/scripts/functions/builtin/distributed_representation.dml
new file mode 100644
index 0000000000..69b06598a1
--- /dev/null
+++ b/src/test/scripts/functions/builtin/distributed_representation.dml
@@ -0,0 +1,31 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = read($1)
+gloveMatrix = read($2)
+vocab = read($3)
+similarity = $4
+threshold = as.double($5)
+
+[Y_unique, Y_duplicates] = dedup(X, gloveMatrix, vocab, similarity, threshold)
+
+write(Y_unique, $6, format="binary")
+write(Y_duplicates, $7, format="binary")
\ No newline at end of file

Reply via email to