gemini-code-assist[bot] commented on code in PR #37466:
URL: https://github.com/apache/beam/pull/37466#discussion_r2756092971
##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -495,6 +509,45 @@ export class CombinePerKeyPrecombineOperator<I, A, O>
this.groups = new Map();
}
+ /**
+ * Flushes entries from the cache using LRU eviction.
+ * Evicts the least recently used entries (from the front of the Map)
+ * until the cache size is at or below the target.
+ */
+ flushLRU(target: number): ProcessResult {
+ const result = new ProcessResultBuilder();
+ const toDelete: string[] = [];
+ // Iterate from the front (oldest/least recently used entries)
+ for (const [wkey, values] of this.groups) {
+ if (this.groups.size - toDelete.length <= target) {
+ break;
+ }
+ const parts = wkey.split(" ");
+ const encodedWindow = parts[0];
+ const encodedKey = parts[1];
+ const window = decodeFromBase64(encodedWindow, this.windowCoder);
+ result.add(
+ this.receiver.receive({
+ value: {
+ key: decodeFromBase64(encodedKey, this.keyCoder),
+ value: values,
+ },
+ windows: [window],
+ timestamp: window.maxTimestamp(),
+ pane: PaneInfoCoder.ONE_AND_ONLY_FIRING,
+ }),
+ );
+ toDelete.push(wkey);
+ }
+ for (const wkey of toDelete) {
+ this.groups.delete(wkey);
+ }
+ return result.build();
+ }
Review Comment:

There is significant code duplication between `flushLRU` and the `flush`
method. The logic for iterating, decoding keys, sending results, and collecting
keys for deletion is nearly identical. While the loop termination condition is
checked at a different point, the core processing logic is the same. To improve
maintainability and reduce redundancy, this common logic should be extracted
into a private helper method that both `flushLRU` and `flush` can call.
##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -468,24 +470,36 @@ export class CombinePerKeyPrecombineOperator<I, A, O>
);
}
+ /**
+ * Moves a key to the end of the Map (most recently used position).
+ * JavaScript Maps preserve insertion order, so delete + set moves to end.
+ */
+ private touchKey(wkey: string, value: A) {
+ this.groups.delete(wkey);
+ this.groups.set(wkey, value);
+ }
+
process(wvalue: WindowedValue<any>) {
for (const window of wvalue.windows) {
const wkey =
encodeToBase64(window, this.windowCoder) +
" " +
encodeToBase64(wvalue.value.key, this.keyCoder);
- if (!this.groups.has(wkey)) {
- this.groups.set(wkey, this.combineFn.createAccumulator());
- }
- this.groups.set(
- wkey,
- this.combineFn.addInput(this.groups.get(wkey), wvalue.value.value),
- );
+ const existingAccumulator = this.groups.get(wkey);
+ const newAccumulator =
+ existingAccumulator !== undefined
+ ? this.combineFn.addInput(existingAccumulator, wvalue.value.value)
+ : this.combineFn.addInput(
+ this.combineFn.createAccumulator(),
+ wvalue.value.value,
+ );
+ // Move to end (most recently used) by delete + set
+ this.touchKey(wkey, newAccumulator);
Review Comment:

The logic for updating the accumulator and managing the LRU behavior can be
simplified and made slightly more performant. The current approach uses a
`touchKey` helper method which is called for every element. This results in a
`delete` operation even for new keys, which is unnecessary. By inlining this
logic into the `process` method, we can avoid the helper and the extra
operation for new keys, making the code more concise and efficient. This change
would also make the `touchKey` method (lines 477-480) obsolete, allowing for
its removal.
```typescript
const accumulator = this.groups.get(wkey);
if (accumulator !== undefined) {
this.groups.delete(wkey);
}
this.groups.set(
wkey,
this.combineFn.addInput(
accumulator ?? this.combineFn.createAccumulator(),
wvalue.value.value
)
);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]