This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f3f1c93a60 Don't re-encode byte[] values in SortValues transform 
(#31025)
9f3f1c93a60 is described below

commit 9f3f1c93a605bd14409cae141967129f67cd8bb6
Author: Claire McGinty <[email protected]>
AuthorDate: Thu May 30 12:45:16 2024 -0400

    Don't re-encode byte[] values in SortValues transform (#31025)
    
    * Don't re-encode byte[] values in SortValues transform
    
    * checkstyle
    
    * Apply code review comments
---
 .../beam/sdk/extensions/sorter/SortValues.java     |  26 +++-
 .../beam/sdk/extensions/sorter/SortValuesTest.java | 144 ++++++++++++++++++---
 2 files changed, 149 insertions(+), 21 deletions(-)

diff --git 
a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
 
b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
index bc9fb2f8955..e7618681e1b 100644
--- 
a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
+++ 
b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
@@ -20,7 +20,9 @@ package org.apache.beam.sdk.extensions.sorter;
 import java.io.IOException;
 import java.util.Iterator;
 import javax.annotation.Nonnull;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -131,6 +133,20 @@ public class SortValues<PrimaryKeyT, SecondaryKeyT, ValueT>
     return getSecondaryKeyValueCoder(inputCoder).getValueCoder();
   }
 
+  private static <T> T elementOf(Coder<T> coder, byte[] bytes) throws 
CoderException {
+    if (coder instanceof ByteArrayCoder) {
+      return (T) bytes;
+    }
+    return CoderUtils.decodeFromByteArray(coder, bytes);
+  }
+
+  private static <T> byte[] bytesOf(Coder<T> coder, T element) throws 
CoderException {
+    if (element instanceof byte[]) {
+      return (byte[]) element;
+    }
+    return CoderUtils.encodeToByteArray(coder, element);
+  }
+
   private static class SortValuesDoFn<PrimaryKeyT, SecondaryKeyT, ValueT>
       extends DoFn<
           KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>,
@@ -156,9 +172,7 @@ public class SortValues<PrimaryKeyT, SecondaryKeyT, ValueT>
         Sorter sorter = BufferedExternalSorter.create(sorterOptions);
         for (KV<SecondaryKeyT, ValueT> record : records) {
           sorter.add(
-              KV.of(
-                  CoderUtils.encodeToByteArray(keyCoder, record.getKey()),
-                  CoderUtils.encodeToByteArray(valueCoder, 
record.getValue())));
+              KV.of(bytesOf(keyCoder, record.getKey()), bytesOf(valueCoder, 
record.getValue())));
         }
 
         c.output(KV.of(c.element().getKey(), new 
DecodingIterable(sorter.sort())));
@@ -197,9 +211,9 @@ public class SortValues<PrimaryKeyT, SecondaryKeyT, ValueT>
       public KV<SecondaryKeyT, ValueT> next() {
         KV<byte[], byte[]> next = iterator.next();
         try {
-          return KV.of(
-              CoderUtils.decodeFromByteArray(keyCoder, next.getKey()),
-              CoderUtils.decodeFromByteArray(valueCoder, next.getValue()));
+          SecondaryKeyT secondaryKey = elementOf(keyCoder, next.getKey());
+          ValueT value = elementOf(valueCoder, next.getValue());
+          return KV.of(secondaryKey, value);
         } catch (IOException e) {
           throw new RuntimeException(e);
         }
diff --git 
a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java
 
b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java
index fa3fbe6ae16..451583a9ba9 100644
--- 
a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java
+++ 
b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java
@@ -22,7 +22,10 @@ import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.is;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -67,30 +70,141 @@ public class SortValuesTest {
         grouped.apply(SortValues.create(BufferedExternalSorter.options()));
 
     PAssert.that(groupedAndSorted)
-        .satisfies(new 
AssertThatHasExpectedContentsForTestSecondaryKeySorting());
+        .satisfies(
+            new AssertThatHasExpectedContentsForTestSecondaryKeySorting<>(
+                Arrays.asList(
+                    KV.of(
+                        "key1",
+                        Arrays.asList(
+                            KV.of("secondaryKey1", 10),
+                            KV.of("secondaryKey2", 20),
+                            KV.of("secondaryKey3", 30))),
+                    KV.of(
+                        "key2",
+                        Arrays.asList(KV.of("secondaryKey1", 100), 
KV.of("secondaryKey2", 200))))));
+
+    p.run();
+  }
+
+  @Test
+  public void testSecondaryKeyByteOptimization() {
+    PCollection<KV<String, KV<byte[], Integer>>> input =
+        p.apply(
+            Create.of(
+                Arrays.asList(
+                    KV.of("key1", 
KV.of("secondaryKey2".getBytes(StandardCharsets.UTF_8), 20)),
+                    KV.of("key2", 
KV.of("secondaryKey2".getBytes(StandardCharsets.UTF_8), 200)),
+                    KV.of("key1", 
KV.of("secondaryKey3".getBytes(StandardCharsets.UTF_8), 30)),
+                    KV.of("key1", 
KV.of("secondaryKey1".getBytes(StandardCharsets.UTF_8), 10)),
+                    KV.of("key2", 
KV.of("secondaryKey1".getBytes(StandardCharsets.UTF_8), 100)))));
+
+    // Group by Key, bringing <SecondaryKey, Value> pairs for the same Key 
together.
+    PCollection<KV<String, Iterable<KV<byte[], Integer>>>> grouped =
+        input.apply(GroupByKey.create());
+
+    // For every Key, sort the iterable of <SecondaryKey, Value> pairs by 
SecondaryKey.
+    PCollection<KV<String, Iterable<KV<byte[], Integer>>>> groupedAndSorted =
+        grouped.apply(SortValues.create(BufferedExternalSorter.options()));
+
+    PAssert.that(groupedAndSorted)
+        .satisfies(
+            new AssertThatHasExpectedContentsForTestSecondaryKeySorting<>(
+                Arrays.asList(
+                    KV.of(
+                        "key1",
+                        Arrays.asList(
+                            
KV.of("secondaryKey1".getBytes(StandardCharsets.UTF_8), 10),
+                            
KV.of("secondaryKey2".getBytes(StandardCharsets.UTF_8), 20),
+                            
KV.of("secondaryKey3".getBytes(StandardCharsets.UTF_8), 30))),
+                    KV.of(
+                        "key2",
+                        Arrays.asList(
+                            
KV.of("secondaryKey1".getBytes(StandardCharsets.UTF_8), 100),
+                            
KV.of("secondaryKey2".getBytes(StandardCharsets.UTF_8), 200))))));
+
+    p.run();
+  }
+
+  @Test
+  public void testSecondaryKeyAndValueByteOptimization() {
+    PCollection<KV<String, KV<byte[], byte[]>>> input =
+        p.apply(
+            Create.of(
+                Arrays.asList(
+                    KV.of(
+                        "key1",
+                        
KV.of("secondaryKey2".getBytes(StandardCharsets.UTF_8), new byte[] {1})),
+                    KV.of(
+                        "key2",
+                        
KV.of("secondaryKey2".getBytes(StandardCharsets.UTF_8), new byte[] {2})),
+                    KV.of(
+                        "key1",
+                        
KV.of("secondaryKey3".getBytes(StandardCharsets.UTF_8), new byte[] {3})),
+                    KV.of(
+                        "key1",
+                        
KV.of("secondaryKey1".getBytes(StandardCharsets.UTF_8), new byte[] {4})),
+                    KV.of(
+                        "key2",
+                        
KV.of("secondaryKey1".getBytes(StandardCharsets.UTF_8), new byte[] {5})))));
+
+    // Group by Key, bringing <SecondaryKey, Value> pairs for the same Key 
together.
+    PCollection<KV<String, Iterable<KV<byte[], byte[]>>>> grouped =
+        input.apply(GroupByKey.create());
+
+    // For every Key, sort the iterable of <SecondaryKey, Value> pairs by 
SecondaryKey.
+    PCollection<KV<String, Iterable<KV<byte[], byte[]>>>> groupedAndSorted =
+        grouped.apply(SortValues.create(BufferedExternalSorter.options()));
+
+    PAssert.that(groupedAndSorted)
+        .satisfies(
+            new AssertThatHasExpectedContentsForTestSecondaryKeySorting<>(
+                Arrays.asList(
+                    KV.of(
+                        "key1",
+                        Arrays.asList(
+                            
KV.of("secondaryKey1".getBytes(StandardCharsets.UTF_8), new byte[] {4}),
+                            
KV.of("secondaryKey2".getBytes(StandardCharsets.UTF_8), new byte[] {1}),
+                            KV.of(
+                                
"secondaryKey3".getBytes(StandardCharsets.UTF_8), new byte[] {3}))),
+                    KV.of(
+                        "key2",
+                        Arrays.asList(
+                            
KV.of("secondaryKey1".getBytes(StandardCharsets.UTF_8), new byte[] {5}),
+                            KV.of(
+                                
"secondaryKey2".getBytes(StandardCharsets.UTF_8),
+                                new byte[] {2}))))));
 
     p.run();
   }
 
-  static class AssertThatHasExpectedContentsForTestSecondaryKeySorting
-      implements SerializableFunction<Iterable<KV<String, Iterable<KV<String, 
Integer>>>>, Void> {
+  static class 
AssertThatHasExpectedContentsForTestSecondaryKeySorting<SecondaryKeyT, ValueT>
+      implements SerializableFunction<
+          Iterable<KV<String, Iterable<KV<SecondaryKeyT, ValueT>>>>, Void> {
+    final List<KV<String, List<KV<SecondaryKeyT, ValueT>>>> expected;
+
+    AssertThatHasExpectedContentsForTestSecondaryKeySorting(
+        List<KV<String, List<KV<SecondaryKeyT, ValueT>>>> expected) {
+      this.expected = expected;
+    }
+
     @SuppressWarnings("unchecked")
     @Override
-    public Void apply(Iterable<KV<String, Iterable<KV<String, Integer>>>> 
actual) {
+    public Void apply(Iterable<KV<String, Iterable<KV<SecondaryKeyT, 
ValueT>>>> actual) {
       assertThat(
           actual,
           containsInAnyOrder(
-              KvMatcher.isKv(
-                  is("key1"),
-                  contains(
-                      KvMatcher.isKv(is("secondaryKey1"), is(10)),
-                      KvMatcher.isKv(is("secondaryKey2"), is(20)),
-                      KvMatcher.isKv(is("secondaryKey3"), is(30)))),
-              KvMatcher.isKv(
-                  is("key2"),
-                  contains(
-                      KvMatcher.isKv(is("secondaryKey1"), is(100)),
-                      KvMatcher.isKv(is("secondaryKey2"), is(200))))));
+              expected.stream()
+                  .map(
+                      kv1 ->
+                          KvMatcher.isKv(
+                              is(kv1.getKey()),
+                              contains(
+                                  kv1.getValue().stream()
+                                      .map(
+                                          kv2 ->
+                                              KvMatcher.isKv(is(kv2.getKey()), 
is(kv2.getValue())))
+                                      .collect(Collectors.toList()))))
+                  .collect(Collectors.toList())));
       return null;
     }
   }

Reply via email to