junaiddshaukat commented on code in PR #37466:
URL: https://github.com/apache/beam/pull/37466#discussion_r2756112669


##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -495,6 +509,45 @@ export class CombinePerKeyPrecombineOperator<I, A, O>
     this.groups = new Map();
   }
 
+  /**
+   * Flushes entries from the cache using LRU eviction.
+   * Evicts the least recently used entries (from the front of the Map)
+   * until the cache size is at or below the target.
+   */
+  flushLRU(target: number): ProcessResult {
+    const result = new ProcessResultBuilder();
+    const toDelete: string[] = [];
+    // Iterate from the front (oldest/least recently used entries)
+    for (const [wkey, values] of this.groups) {
+      if (this.groups.size - toDelete.length <= target) {
+        break;
+      }
+      const parts = wkey.split(" ");
+      const encodedWindow = parts[0];
+      const encodedKey = parts[1];
+      const window = decodeFromBase64(encodedWindow, this.windowCoder);
+      result.add(
+        this.receiver.receive({
+          value: {
+            key: decodeFromBase64(encodedKey, this.keyCoder),
+            value: values,
+          },
+          windows: [window],
+          timestamp: window.maxTimestamp(),
+          pane: PaneInfoCoder.ONE_AND_ONLY_FIRING,
+        }),
+      );
+      toDelete.push(wkey);
+    }
+    for (const wkey of toDelete) {
+      this.groups.delete(wkey);
+    }
+    return result.build();
+  }

Review Comment:
   Great suggestion! You're absolutely right, since flush is only called with 
target=0 (in finishBundle), and flushLRU(0) would achieve the same result of 
flushing all entries, we can simply replace flush(target) with flushLRU(0). Let 
me fix that!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to