jbartok commented on a change in pull request #12567:
URL: https://github.com/apache/beam/pull/12567#discussion_r471405630



##########
File path: 
runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java
##########
@@ -276,36 +262,94 @@ Processor getEx(
           inputValueCoder,
           outputValueCoders,
           ordinalToSideInput,
+          sideInputMapping,
           ownerId,
           stepId);
     }
   }
 
-  private static class KeyedStepContext implements StepContext {
+  private class KeyedStepContext implements StepContext {
 
-    private final Map<Object, InMemoryStateInternals> stateInternalsOfKeys;
-    private final InMemoryTimerInternals timerInternals;
+    private final Object nullKey = new Object();
 
-    private InMemoryStateInternals currentStateInternals;
+    private final ConcurrentHashMap<Object, InMemoryStateInternals> 
keyedStateInternals;
+    private final ConcurrentHashMap<Object, InMemoryTimerInternals> 
keyedTimerInternals;
 
-    KeyedStepContext(InMemoryTimerInternals timerInternals) {
-      this.stateInternalsOfKeys = new HashMap<>();
-      this.timerInternals = timerInternals;
+    @SuppressWarnings("ThreadLocalUsage")
+    private final ThreadLocal<Object> currentKey = new ThreadLocal<>();
+
+    KeyedStepContext() {
+      this.keyedStateInternals = new ConcurrentHashMap<>();
+      this.keyedTimerInternals = new ConcurrentHashMap<>();
     }
 
     void setKey(Object key) {
-      currentStateInternals =
-          stateInternalsOfKeys.computeIfAbsent(key, 
InMemoryStateInternals::forKey);
+      Object normalizedKey = key == null ? nullKey : key;
+      currentKey.set(normalizedKey);
+      keyedStateInternals.computeIfAbsent(normalizedKey, 
InMemoryStateInternals::forKey);
+      keyedTimerInternals.computeIfAbsent(normalizedKey, k -> new 
InMemoryTimerInternals());
+    }
+
+    void clearKey() {
+      currentKey.remove();
     }
 
     @Override
     public StateInternals stateInternals() {
-      return currentStateInternals;
+      Object key = currentKey.get();
+      if (key == null) {
+        throw new IllegalStateException("Active key should be set");
+      }
+      return keyedStateInternals.get(key);
     }
 
     @Override
     public TimerInternals timerInternals() {
-      return timerInternals;
+      Object key = currentKey.get();
+      if (key == null) {
+        throw new IllegalStateException("Active key should be set");
+      }
+      return keyedTimerInternals.get(key);
+    }
+
+    public void advanceProcessingTimes() {
+      Instant now = Instant.now();
+      keyedTimerInternals
+          .values()
+          .forEach(
+              timerInternals -> {
+                try {
+                  timerInternals.advanceProcessingTime(now);
+                  timerInternals.advanceSynchronizedProcessingTime(now);
+                } catch (Exception e) {
+                  throw new RuntimeException("Failed advancing time!");
+                }
+              });
+    }
+
+    public void flushTimers(long watermark) {
+      Instant watermarkInstant = new Instant(watermark);
+      keyedTimerInternals
+          .entrySet()
+          .forEach(
+              (entry) -> {
+                InMemoryTimerInternals timerInternals = entry.getValue();
+                if 
(timerInternals.currentInputWatermarkTime().isBefore(watermark)) {
+                  try {
+                    timerInternals.advanceInputWatermark(watermarkInstant);
+                    if 
(watermarkInstant.equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+                      timerInternals.advanceProcessingTime(watermarkInstant);
+                      
timerInternals.advanceSynchronizedProcessingTime(watermarkInstant);

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to