[ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355790&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355790
 ]

ASF GitHub Bot logged work on BEAM-8550:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Dec/19 10:24
            Start Date: 08/Dec/19 10:24
    Worklog Time Spent: 10m 
      Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355168771
 
 

 ##########
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 ##########
 @@ -448,18 +458,152 @@ public String toNativeString() {
     JavaRDD<KV<K, Iterable<WindowedValue<V>>>> groupRDD =
         GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, 
partitioner);
 
-    return groupRDD
-        .map(
-            input -> {
-              final K key = input.getKey();
-              Iterable<WindowedValue<V>> value = input.getValue();
-              return FluentIterable.from(value)
-                  .transform(
-                      windowedValue ->
-                          windowedValue.withValue(KV.of(key, 
windowedValue.getValue())))
-                  .iterator();
-            })
-        .flatMapToPair(doFnFunction);
+    if (!requiresSortedInput) {
+      return groupRDD
+          .map(
+              input -> {
+                final K key = input.getKey();
+                Iterable<WindowedValue<V>> value = input.getValue();
+                return FluentIterable.from(value)
+                    .transform(
+                        windowedValue ->
+                            windowedValue.withValue(KV.of(key, 
windowedValue.getValue())))
+                    .iterator();
+              })
+          .flatMapToPair(doFnFunction);
+    }
+
+    JavaPairRDD<ByteArray, byte[]> pairRDD =
+        kvInRDD
+            .map(new ReifyTimestampsAndWindowsFunction<>())
+            .mapToPair(TranslationUtils.toPairFunction())
+            .mapToPair(
+                CoderHelpers.toByteFunctionWithTs(keyCoder, wvCoder, in -> 
in._2().getTimestamp()));
+
+    JavaPairRDD<ByteArray, byte[]> sorted =
+        
pairRDD.repartitionAndSortWithinPartitions(keyPrefixPartitionerFrom(partitioner));
+
+    return sorted.mapPartitionsToPair(wrapDoFnFromSortedRDD(doFnFunction, 
keyCoder, wvCoder));
+  }
+
+  private static Partitioner keyPrefixPartitionerFrom(Partitioner partitioner) 
{
+    return new Partitioner() {
+      @Override
+      public int numPartitions() {
+        return partitioner.numPartitions();
+      }
+
+      @Override
+      public int getPartition(Object o) {
+        ByteArray b = (ByteArray) o;
+        return partitioner.getPartition(
+            new ByteArray(Arrays.copyOfRange(b.getValue(), 0, 
b.getValue().length - 8)));
+      }
+    };
+  }
+
+  private static <K, V, OutputT>
+      PairFlatMapFunction<Iterator<Tuple2<ByteArray, byte[]>>, TupleTag<?>, 
WindowedValue<?>>
+          wrapDoFnFromSortedRDD(
+              MultiDoFnFunction<KV<K, V>, OutputT> doFnFunction,
+              Coder<K> keyCoder,
+              Coder<WindowedValue<V>> wvCoder) {
+
+    return (Iterator<Tuple2<ByteArray, byte[]>> in) -> {
+      Iterator<Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>>> mappedGroups;
+      mappedGroups =
+          Iterators.transform(
+              splitBySameKey(in, keyCoder, wvCoder),
+              group -> {
+                try {
+                  return doFnFunction.call(group);
+                } catch (Exception ex) {
+                  throw new RuntimeException(ex);
+                }
+              });
+      return flatten(mappedGroups);
+    };
+  }
+
+  @VisibleForTesting
+  static <T> Iterator<T> flatten(final Iterator<Iterator<T>> toFlatten) {
+
+    return new AbstractIterator<T>() {
+
+      @Nullable Iterator<T> current = null;
+
+      @Override
+      protected T computeNext() {
+        while (true) {
+          if (current == null) {
+            if (toFlatten.hasNext()) {
+              current = toFlatten.next();
+            } else {
+              return endOfData();
+            }
+          }
+          if (current.hasNext()) {
+            return current.next();
+          }
+          current = null;
+        }
+      }
+    };
+  }
+
+  @VisibleForTesting
+  static <K, V> Iterator<Iterator<WindowedValue<KV<K, V>>>> splitBySameKey(
+      Iterator<Tuple2<ByteArray, byte[]>> in, Coder<K> keyCoder, 
Coder<WindowedValue<V>> wvCoder) {
+
+    return new AbstractIterator<Iterator<WindowedValue<KV<K, V>>>>() {
+
+      @Nullable Tuple2<ByteArray, byte[]> read = null;
+
+      @Override
+      protected Iterator<WindowedValue<KV<K, V>>> computeNext() {
+        readNext();
+        if (read != null) {
+          byte[] value = read._1().getValue();
+          byte[] keyPart = Arrays.copyOfRange(value, 0, value.length - 8);
+          K key = CoderHelpers.fromByteArray(keyPart, keyCoder);
+          return createIteratorForKey(keyPart, key);
+        }
+        return endOfData();
+      }
+
+      private void readNext() {
+        if (read == null) {
+          if (in.hasNext()) {
+            read = in.next();
+          }
+        }
+      }
+
+      private void consumed() {
+        read = null;
+      }
+
+      private Iterator<WindowedValue<KV<K, V>>> createIteratorForKey(byte[] 
keyPart, K key) {
+
+        return new AbstractIterator<WindowedValue<KV<K, V>>>() {
+          @Override
+          protected WindowedValue<KV<K, V>> computeNext() {
+            readNext();
+            if (read != null) {
+              byte[] value = read._1().getValue();
+              byte[] prefix = Arrays.copyOfRange(value, 0, value.length - 8);
 
 Review comment:
   reuse this (V, Ts) split
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 355790)
    Time Spent: 3h  (was: 2h 50m)

> @RequiresTimeSortedInput DoFn annotation
> ----------------------------------------
>
>                 Key: BEAM-8550
>                 URL: https://issues.apache.org/jira/browse/BEAM-8550
>             Project: Beam
>          Issue Type: New Feature
>          Components: beam-model, sdk-java-core
>            Reporter: Jan Lukavský
>            Assignee: Jan Lukavský
>            Priority: Major
>          Time Spent: 3h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to