This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 4eb1273351 [SYSTEMDS-3178] Builtin for tuple deduplication
4eb1273351 is described below
commit 4eb1273351a0b6d6a3b4908d64a1fe1e44839d55
Author: esracosgun <[email protected]>
AuthorDate: Thu Jun 12 16:38:42 2025 +0200
[SYSTEMDS-3178] Builtin for tuple deduplication
Closes #2293.
Co-authored-by: Zohreh Asadi
<[email protected]>
---
scripts/builtin/dedup.dml | 256 +++++++++++++++++++++
.../java/org/apache/sysds/common/Builtins.java | 1 +
.../instructions/ooc/TransposeOOCInstruction.java | 1 -
.../apache/sysds/runtime/util/UtilFunctions.java | 17 +-
.../functions/builtin/part1/BuiltinDedupTest.java | 161 +++++++++++++
.../sysds/test/functions/ooc/TransposeTest.java | 2 +-
.../builtin/distributed_representation.dml | 31 +++
7 files changed, 466 insertions(+), 3 deletions(-)
diff --git a/scripts/builtin/dedup.dml b/scripts/builtin/dedup.dml
new file mode 100644
index 0000000000..1ec2e29c39
--- /dev/null
+++ b/scripts/builtin/dedup.dml
@@ -0,0 +1,256 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Builtin for deduplication using distributed representations (DRs) and
+# locality-sensitive hashing (LSH) based blocking.
+#
+# The function encodes each input tuple as a dense vector using pre-trained
GloVe embeddings (simple averaging),
+# groups semantically similar tuples via LSH into buckets, and compares only
those pairs for deduplication.
+#
+#
+# INPUT:
+#
--------------------------------------------------------------------------------------
+# X Input Frame[String] with n rows and d columns (raw tuples)
+# gloveMatrix Matrix[Double] of size |V| × e (pretrained GloVe
embeddings) -> |V| number of words and e = embedding dimesnion
+# vocab Frame[String] of size |V| × 1 (vocabulary aligned with
gloveMatrix)
+# similarityMeasure (optional) String specifying similarity metric: "cosine",
"euclidean"
+# threshold (optional) Double: threshold value above which tuples are
considered duplicates
+#
--------------------------------------------------------------------------------------
+#
+# OUTPUT:
+#
--------------------------------------------------------------------------------------
+# Y_unique Frame[String] with deduplicated tuples
+# (first occurrence of each duplicate group is retained)
+# Y_duplicates Frame[String] with all detected duplicates
+# (i.e., tuples removed from the input)
+#
--------------------------------------------------------------------------------------
+
+f_dedup = function(Frame[String] X, Matrix[Double] gloveMatrix, Frame[String]
vocab, String similarityMeasure = "cosine", Double threshold = 0.8)
+ return(Frame[String] Y_unique, Frame[String] Y_duplicates)
+{
+ # Step 1: Distributed Representation (DRs)
+ V = computeDRMatrix(X, vocab, gloveMatrix)
+
+ # Step 2: generate LSH Hyperplanes
+ K = 10 # number of hash functions
+ d = ncol(V)
+ H = rand(rows=K, cols=d, pdf="uniform", seed=-1)
+
+ # Step 3: Compute LSH Hashcodes
+ hashCodes = computeLSH(V, H)
+
+ # Step 4: Form Buckets
+ bucketIDs = formBuckets(hashCodes)
+
+ # Step 5: Candidate Pair Generation
+ pairs = findCandidatePairs(bucketIDs)
+
+ # Step 6: Compute Similarity for Pairs
+ sim = computeSimilarity(V, pairs, similarityMeasure)
+
+ # Step 7: Filter Duplicates
+ matches = filterDuplicates(pairs, sim, threshold)
+
+ # Step 8: Extract duplicate indices
+ rows = nrow(matches)
+
+ tmp1 = ifelse(rows > 0, matches[1:rows, 1:1], matrix(0, rows=0, cols=1))
+ tmp2 = ifelse(rows > 0, matches[1:rows, 2:2], matrix(0, rows=0, cols=1))
+ allDupIDs = rbind(tmp1, tmp2)
+ allDupIDs = ifelse(nrow(allDupIDs) > 0, unique(allDupIDs), matrix(0, rows=0,
cols=1))
+
+ # Step 9: Keep the first index, remove all others
+ keepMask = matrix(1, rows=nrow(X), cols=1)
+
+ if (nrow(allDupIDs) > 0) {
+ # Find the first index (minimum) among the duplicates
+ minIdx = min(allDupIDs)
+ for (i in 1:nrow(allDupIDs)) {
+ idx = as.scalar(allDupIDs[i,1])
+ if (idx != minIdx) {
+ keepMask[idx,1] = 0
+ }
+ }
+ }
+
+ # extract IDs from keepMask
+ keepIDs = matrix(0, rows=0, cols=1)
+ dupIndices = matrix(0, rows=0, cols=1)
+ for (i in 1:nrow(keepMask)) {
+ if (as.scalar(keepMask[i,1]) == 1) {
+ keepIDs = rbind(keepIDs, matrix(i,1,1))
+ } else {
+ dupIndices = rbind(dupIndices, matrix(i,1,1))
+ }
+ }
+
+ # Step 10: Extract duplicates and unique rows from X
+ Y_duplicates = removeEmpty(target=X[1,], margin="rows")
+ Y_unique = removeEmpty(target=X[1,], margin="rows")
+
+ if (nrow(dupIndices) > 0) {
+ for (i in 1:nrow(dupIndices)) {
+ id = as.scalar(dupIndices[i, 1])
+ row = X[id, ]
+ Y_duplicates = rbind(Y_duplicates, row)
+ }
+ }
+ if (nrow(keepIDs) > 0) {
+ for (i in 1:nrow(keepIDs)) {
+ id = as.scalar(keepIDs[i, 1])
+ row = X[id, ]
+ Y_unique = rbind(Y_unique, row)
+ }
+ }
+}
+
+computeDRMatrix = function(Frame[String] X, Frame[String] vocab,
Matrix[Double] gloveMatrix)
+ return(Matrix[Double] V)
+{
+ # TODO: Vectorize this implementation with dedicated transform incode
permutation matrices
+ n = nrow(X)
+ d = ncol(gloveMatrix)
+ V = matrix(0, rows=n, cols=d) # define output matrix
+
+ for (i in 1:n) {
+ row = X[i,]
+ words = transformapply(row, "UtilFunctions.cleanAndTokenizeRow")
+
+ sumVec = matrix(0, rows=1, cols=d)
+ count = 0
+
+ for (k in 1:length(words)) {
+ w = words[k]
+ idx = -1
+ found = FALSE
+
+ # search for word in vocabulary
+ for (m in 1:nrow(vocab)) {
+ if (!found & vocab[m,1] == w) {
+ idx = m
+ found = TRUE
+ }
+ }
+ # word found
+ if (idx > 0) {
+ sumVec = sumVec + gloveMatrix[idx,]
+ count = count + 1
+ }
+ }
+ if (count > 0) {
+ V[i,] = sumVec / count
+ }
+ else {
+ V[i,] = sumVec
+ }
+ }
+}
+
+computeLSH = function(Matrix[Double] V, Matrix[Double] H)
+ return(Matrix[Double] hashCodes)
+{
+ # matrix multiplication: projection of each DR vector on hyperplanes
+ P = V %*% t(H)
+
+ # compare elementwise
+ hashCodes = (P >= 0) # returns 1 for true, 0 for false
+}
+
+formBuckets = function(Matrix[Double] hashCodes)
+ return(Matrix[Double] bucketIDs)
+{
+ # TODO vectorize
+ n = nrow(hashCodes)
+ K = ncol(hashCodes)
+
+ # generate binary weighting vector (e.g. 2^n-1, ..., 2^0)
+ powers = matrix(0, rows=1, cols=K)
+ for (k in 1:K) {
+ powers[1, k] = 2^(K-k)
+ }
+
+ # generate Bucket-IDs
+ bucketIDs = hashCodes %*% t(powers)
+}
+
+findCandidatePairs = function(Matrix[Double] bucketIDs)
+ return(Matrix[Double] pairs)
+{
+ n = nrow(bucketIDs)
+ pairs = matrix(0, rows=0, cols=2)
+
+ # O(n^2)-Vergleich TODO: ggf. mit Java verbessern
+ for (i in 1:(n - 1)) {
+ for (j in (i + 1):n) {
+ if (as.scalar(bucketIDs[i,1]) == as.scalar(bucketIDs[j,1])) {
+ pairs = rbind(pairs, matrix([i, j], rows=1, cols=2))
+ }
+ }
+ }
+}
+
+computeSimilarity = function(Matrix[Double] V, Matrix[Double] pairs, String
similarityMeasure)
+ return(Matrix[Double] similarities)
+{
+ m = nrow(pairs)
+ d = ncol(V)
+ similarities = matrix(0.0, rows=m, cols=1)
+
+ for (k in 1:m) {
+ i = as.scalar(pairs[k,1])
+ j = as.scalar(pairs[k,2])
+
+ vi = V[i,] # Vektor i
+ vj = V[j,] # Vektor j
+
+ if (similarityMeasure == "cosine") {
+ dot = sum(vi * vj)
+ norm_i = sqrt(sum(vi^2))
+ norm_j = sqrt(sum(vj^2))
+ sim = dot / (norm_i * norm_j)
+ }
+ else if (similarityMeasure == "euclidean") {
+ diff = vi - vj
+ sim = -1 * sqrt(sum(diff^2))
+ }
+ else {
+ stop("Unsupported similarity measure: " + similarityMeasure)
+ }
+
+ similarities[k,1] = sim
+ }
+}
+
+filterDuplicates = function(Matrix[Double] pairs, Matrix[Double] similarities,
Double threshold)
+ return(Matrix[Double] matches)
+{
+ m = nrow(pairs)
+ matches = matrix(0, rows=0, cols=2)
+
+ for (i in 1:m) {
+ sim = similarities[i,1]
+
+ if (sim >= threshold) {
+ row = matrix(pairs[i,], rows=1, cols=2) #row = pairs[i,]
+ matches = rbind(matches, row)
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/sysds/common/Builtins.java
b/src/main/java/org/apache/sysds/common/Builtins.java
index 5fe1721cc2..4feab311c7 100644
--- a/src/main/java/org/apache/sysds/common/Builtins.java
+++ b/src/main/java/org/apache/sysds/common/Builtins.java
@@ -115,6 +115,7 @@ public enum Builtins {
DECISIONTREE("decisionTree", true),
DECISIONTREEPREDICT("decisionTreePredict", true),
DECOMPRESS("decompress", false),
+ DEDUP("dedup", true),
DEEPWALK("deepWalk", true),
DET("det", false),
DETECTSCHEMA("detectSchema", false),
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java
index fffd7ee7ed..212d0d5c56 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java
@@ -30,7 +30,6 @@ import
org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
-import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
import org.apache.sysds.runtime.util.CommonThreadPool;
import java.util.concurrent.ExecutorService;
diff --git a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
index 3fd1dfd1a3..cc370d6ae9 100644
--- a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
@@ -1456,4 +1456,19 @@ public class UtilFunctions {
//wer = number of edits / length
return (double)p[n] / Math.max(n, m);
}
-}
+
+ public static String[] cleanAndTokenizeRow(String[] row) {
+ if (row == null || row.length == 0) {
+ return new String[0];
+ }
+ StringBuilder sb = new StringBuilder();
+ for (String s : row) {
+ if (s != null) {
+ sb.append(s).append(" ");
+ }
+ }
+ String joined = sb.toString().trim().toLowerCase();
+
+ return joined.split("\\s+");
+ }
+}
\ No newline at end of file
diff --git
a/src/test/java/org/apache/sysds/test/functions/builtin/part1/BuiltinDedupTest.java
b/src/test/java/org/apache/sysds/test/functions/builtin/part1/BuiltinDedupTest.java
new file mode 100644
index 0000000000..4be1781207
--- /dev/null
+++
b/src/test/java/org/apache/sysds/test/functions/builtin/part1/BuiltinDedupTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.builtin.part1;
+
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.common.Types.ExecType;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.common.Types.FileFormat;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Test;
+
+public class BuiltinDedupTest extends AutomatedTestBase {
+ private final static String TEST_NAME = "distributed_representation";
+ private final static String TEST_DIR = "functions/builtin/";
+ private static final String TEST_CLASS_DIR = TEST_DIR +
BuiltinDedupTest.class.getSimpleName() + "/";
+
+ @Override
+ public void setUp() {
+ TestUtils.clearAssertionInformation();
+ addTestConfiguration(TEST_NAME, new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[]{"Y_unique",
"Y_duplicates"}));
+ if (TEST_CACHE_ENABLED) {
+ setOutAndExpectedDeletionDisabled(true);
+ }
+ }
+
+ @Test
+ public void testSimpleDedupCP() {
+ runTestCase(ExecType.CP);
+ }
+
+ @SuppressWarnings("unused")
+ private void runTestCase(ExecType execType) {
+ Types.ExecMode platformOld = setExecMode(execType);
+ try {
+ loadTestConfiguration(getTestConfiguration(TEST_NAME));
+ String HOME = SCRIPT_DIR + TEST_DIR;
+ fullDMLScriptName = HOME + TEST_NAME + ".dml";
+
+ programArgs = new String[]{
+ "-stats", "-args",
+ input("X"), input("gloveMatrix"),
input("vocab"),
+ "cosine", "0.8",
+ output("Y_unique"), output("Y_duplicates")
+ };
+
+ // ----- Frame X -----
+ String[][] X = new String[][]{
+ {"John Doe", "New York"},
+ {"Jon Doe", "New York City"},
+ {"Jane Doe", "Boston"},
+ {"John Doe", "NY"}
+ };
+ ValueType[] schemaX = new ValueType[]{ValueType.STRING,
ValueType.STRING};
+ FrameBlock fbX = new FrameBlock(schemaX);
+ for (String[] row : X) fbX.appendRow(row);
+ writeInputFrameWithMTD("X", fbX, true, new
MatrixCharacteristics(X.length, X[0].length, -1, -1), schemaX,
FileFormat.BINARY);
+
+ // ----- Vocab -----
+ String[][] vocab = new String[][]{
+ {"john"}, {"doe"}, {"new"}, {"york"},
+ {"city"}, {"boston"}, {"ny"}, {"jane"}
+ };
+ ValueType[] schemaVocab = new
ValueType[]{ValueType.STRING};
+ FrameBlock fbVocab = new FrameBlock(schemaVocab);
+ for (String[] row : vocab) fbVocab.appendRow(row);
+ writeInputFrameWithMTD("vocab", fbVocab, true, new
MatrixCharacteristics(vocab.length, 1, -1, -1), schemaVocab, FileFormat.BINARY);
+
+ // ----- Glove-Matrix -----
+ double[][] gloveMatrix = getRandomMatrix(vocab.length,
50, -0.5, 0.5, 1, 123);
+ writeInputMatrixWithMTD("gloveMatrix", gloveMatrix,
true);
+
+ // Run
+ runTest(true, false, null, -1);
+
+ // Expected unique
+ String[][] expectedUnique = new String[][] {
+ {"John Doe", "New York"},
+ {"Jon Doe", "New York City"},
+ {"Jane Doe", "Boston"}
+ };
+
+ // Expected duplicates
+ String[][] expectedDupes = new String[][] {
+ {"John Doe", "NY"}
+ };
+
+ /* FIXME test/builtin correctness
+ // --- Validate output frames ---
+ FrameBlock outUnique = readDMLFrameFromHDFS("Y_unique",
FileFormat.BINARY);
+ FrameBlock outDupes =
readDMLFrameFromHDFS("Y_duplicates", FileFormat.BINARY);
+
+ String[][] actualUnique =
frameBlockToStringArray(outUnique);
+ String[][] actualDupes =
frameBlockToStringArray(outDupes);
+
+ // Compare
+ System.out.println("Unqiue tuples: " +
Arrays.deepToString(actualUnique));
+ System.out.println("Actual Dupes: " +
Arrays.deepToString(actualDupes));
+ assertStringArrayEquals(expectedUnique, actualUnique);
+ assertStringArrayEquals(expectedDupes, actualDupes);
+ */
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ finally {
+ rtplatform = platformOld;
+ }
+ }
+
+ @SuppressWarnings("unused")
+ private String[][] frameBlockToStringArray(FrameBlock fb) {
+ int rows = fb.getNumRows();
+ int cols = fb.getNumColumns();
+ String[][] out = new String[rows][cols];
+ for (int i = 0; i < rows; i++) {
+ for (int j = 0; j < cols; j++) {
+ Object val = fb.get(i, j);
+ out[i][j] = (val != null) ? val.toString() : "";
+ }
+ }
+ return out;
+ }
+
+ @SuppressWarnings("unused")
+ private void assertStringArrayEquals(String[][] expected, String[][]
actual) {
+ if (expected.length != actual.length || expected[0].length !=
actual[0].length) {
+ throw new AssertionError("Array dimensions do not
match");
+ }
+ for (int i = 0; i < expected.length; i++) {
+ for (int j = 0; j < expected[0].length; j++) {
+ if (!expected[i][j].equals(actual[i][j])) {
+ throw new AssertionError(String.format(
+ "Mismatch at [%d,%d]:
expected='%s' but got='%s'",
+ i, j, expected[i][j],
actual[i][j]
+ ));
+ }
+ }
+ }
+ }
+}
diff --git
a/src/test/java/org/apache/sysds/test/functions/ooc/TransposeTest.java
b/src/test/java/org/apache/sysds/test/functions/ooc/TransposeTest.java
index 0dc04043d4..c7a037a50c 100644
--- a/src/test/java/org/apache/sysds/test/functions/ooc/TransposeTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/ooc/TransposeTest.java
@@ -46,7 +46,7 @@ public class TransposeTest extends AutomatedTestBase {
private final static int rows = 1000;
private final static int cols_wide = 1000;
- private final static int cols_skinny = 500;
+ //private final static int cols_skinny = 500;
private final static double sparsity1 = 0.7;
private final static double sparsity2 = 0.1;
diff --git a/src/test/scripts/functions/builtin/distributed_representation.dml
b/src/test/scripts/functions/builtin/distributed_representation.dml
new file mode 100644
index 0000000000..69b06598a1
--- /dev/null
+++ b/src/test/scripts/functions/builtin/distributed_representation.dml
@@ -0,0 +1,31 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = read($1)
+gloveMatrix = read($2)
+vocab = read($3)
+similarity = $4
+threshold = as.double($5)
+
+[Y_unique, Y_duplicates] = dedup(X, gloveMatrix, vocab, similarity, threshold)
+
+write(Y_unique, $6, format="binary")
+write(Y_duplicates, $7, format="binary")
\ No newline at end of file