This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f5fc1f4842 Add LRU cache eviction to CachingStateProvider (#37214)
5f5fc1f4842 is described below

commit 5f5fc1f4842f819f389858777efdc561514b606f
Author: M Junaid Shaukat <[email protected]>
AuthorDate: Fri Jan 16 20:01:06 2026 +0500

    Add LRU cache eviction to CachingStateProvider (#37214)
    
    * Add LRU cache eviction to CachingStateProvider
    
    Fixes #37213
    
    Implements LRU (Least Recently Used) cache eviction to prevent
    unbounded memory growth in long-running workers. Adds configurable
    maxCacheSize parameter (default: 1000 entries) and maintains LRU
    order using JavaScript Map's insertion order.
    
    - Add maxCacheSize constructor parameter with default value of 1000
    - Implement evictIfNeeded() to remove oldest entry when cache is full
    - Implement touchCacheEntry() to move accessed items to end (LRU)
    - Add comprehensive test coverage in state_provider_test.ts
    
    This addresses the TODO comment in the code and improves reliability
    for production workloads.
    
    * Address review comments: size-based LRU eviction for CachingStateProvider
    
    - Fixed bug: removed incorrect evictIfNeeded() call in promise callback
    - Removed unnecessary this_ variable (arrow functions capture this)
    - Changed from count-based to size-based eviction (similar to Python 
statecache.py)
    - Added estimateSize() to calculate memory weight of cached values
    - Default cache weight: 100MB
    - Updated tests to work with weight-based eviction
    
    * Fix prettier formatting
    
    * Address review comments: circular references, eviction ordering, tests
    
    - Fixed sizeof function to handle circular references using visited Set
    - Fixed eviction ordering: add to cache first, then evict (fixes edge case)
    - Added test for oversized item that exceeds maxCacheWeight
    - Implemented custom sizeof instead of object-sizeof package (has Node.js 
compatibility issues)
    
    * Address Gemini comments: fix race condition, optimize evictIfNeeded
    
    - Fixed critical race condition in promise callback: only update cache if
      the entry is still the same promise we're resolving
    - Optimized evictIfNeeded: use entries() iterator and removed redundant 
checks
---
 sdks/typescript/package-lock.json               |  90 +++----
 sdks/typescript/src/apache_beam/worker/state.ts | 135 +++++++++-
 sdks/typescript/test/state_provider_test.ts     | 328 ++++++++++++++++++++++++
 3 files changed, 494 insertions(+), 59 deletions(-)

diff --git a/sdks/typescript/package-lock.json 
b/sdks/typescript/package-lock.json
index 76f314ad490..29918b01ab8 100644
--- a/sdks/typescript/package-lock.json
+++ b/sdks/typescript/package-lock.json
@@ -49,7 +49,6 @@
       "version": "0.8.0",
       "resolved": 
"https://registry.npmjs.org/@cspotcode/source-map-consumer/-/source-map-consumer-0.8.0.tgz";,
       "integrity": 
"sha512-41qniHzTU8yAGbCp04ohlmSrZf8bkf/iJsl3V0dRGsQN/5GFfx+LbCSsCpp2gqrqjTVg/K6O8ycoV35JIwAzAg==",
-      "peer": true,
       "engines": {
         "node": ">= 12"
       }
@@ -58,7 +57,6 @@
       "version": "0.7.0",
       "resolved": 
"https://registry.npmjs.org/@cspotcode/source-map-support/-/source-map-support-0.7.0.tgz";,
       "integrity": 
"sha512-X4xqRHqN8ACt2aHVe51OxeA2HjbcL4MqFqXkrmQszJ1NOUuUu5u6Vqx/0lZSVNku7velL5FC/s5uEAj1lsBMhA==",
-      "peer": true,
       "dependencies": {
         "@cspotcode/source-map-consumer": "0.8.0"
       },
@@ -194,6 +192,7 @@
       "version": "1.4.6",
       "resolved": 
"https://registry.npmjs.org/@grpc/grpc-js/-/grpc-js-1.4.6.tgz";,
       "integrity": 
"sha512-Byau4xiXfIixb1PnW30V/P9mkrZ05lknyNqiK+cVY9J5hj3gecxd/anwaUbAM8j834zg1x78NvAbwGnMfWEu7A==",
+      "peer": true,
       "dependencies": {
         "@grpc/proto-loader": "^0.6.4",
         "@types/node": ">=12.12.47"
@@ -553,26 +552,22 @@
     "node_modules/@tsconfig/node10": {
       "version": "1.0.8",
       "resolved": 
"https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.8.tgz";,
-      "integrity": 
"sha512-6XFfSQmMgq0CFLY1MslA/CPUfhIL919M1rMsa5lP2P097N2Wd1sSX0tx1u4olM16fLNhtHZpRhedZJphNJqmZg==",
-      "peer": true
+      "integrity": 
"sha512-6XFfSQmMgq0CFLY1MslA/CPUfhIL919M1rMsa5lP2P097N2Wd1sSX0tx1u4olM16fLNhtHZpRhedZJphNJqmZg=="
     },
     "node_modules/@tsconfig/node12": {
       "version": "1.0.9",
       "resolved": 
"https://registry.npmjs.org/@tsconfig/node12/-/node12-1.0.9.tgz";,
-      "integrity": 
"sha512-/yBMcem+fbvhSREH+s14YJi18sp7J9jpuhYByADT2rypfajMZZN4WQ6zBGgBKp53NKmqI36wFYDb3yaMPurITw==",
-      "peer": true
+      "integrity": 
"sha512-/yBMcem+fbvhSREH+s14YJi18sp7J9jpuhYByADT2rypfajMZZN4WQ6zBGgBKp53NKmqI36wFYDb3yaMPurITw=="
     },
     "node_modules/@tsconfig/node14": {
       "version": "1.0.1",
       "resolved": 
"https://registry.npmjs.org/@tsconfig/node14/-/node14-1.0.1.tgz";,
-      "integrity": 
"sha512-509r2+yARFfHHE7T6Puu2jjkoycftovhXRqW328PDXTVGKihlb1P8Z9mMZH04ebyajfRY7dedfGynlrFHJUQCg==",
-      "peer": true
+      "integrity": 
"sha512-509r2+yARFfHHE7T6Puu2jjkoycftovhXRqW328PDXTVGKihlb1P8Z9mMZH04ebyajfRY7dedfGynlrFHJUQCg=="
     },
     "node_modules/@tsconfig/node16": {
       "version": "1.0.2",
       "resolved": 
"https://registry.npmjs.org/@tsconfig/node16/-/node16-1.0.2.tgz";,
-      "integrity": 
"sha512-eZxlbI8GZscaGS7kkc/trHTT5xgrjH3/1n2JDwusC9iahPKWMRvRjJSAN5mCXviuTGQ/lHnhvv8Q1YTpnfz9gA==",
-      "peer": true
+      "integrity": 
"sha512-eZxlbI8GZscaGS7kkc/trHTT5xgrjH3/1n2JDwusC9iahPKWMRvRjJSAN5mCXviuTGQ/lHnhvv8Q1YTpnfz9gA=="
     },
     "node_modules/@types/duplexify": {
       "version": "3.6.1",
@@ -602,7 +597,8 @@
     "node_modules/@types/node": {
       "version": "17.0.8",
       "resolved": "https://registry.npmjs.org/@types/node/-/node-17.0.8.tgz";,
-      "integrity": 
"sha512-YofkM6fGv4gDJq78g4j0mMuGMkZVxZDgtU0JRdx6FgiJDG+0fY0GKVolOV8WqVmEhLCXkQRjwDdKyPxJp/uucg=="
+      "integrity": 
"sha512-YofkM6fGv4gDJq78g4j0mMuGMkZVxZDgtU0JRdx6FgiJDG+0fY0GKVolOV8WqVmEhLCXkQRjwDdKyPxJp/uucg==",
+      "peer": true
     },
     "node_modules/@typescript-eslint/eslint-plugin": {
       "version": "5.24.0",
@@ -642,6 +638,7 @@
       "resolved": 
"https://registry.npmjs.org/@typescript-eslint/parser/-/parser-5.24.0.tgz";,
       "integrity": 
"sha512-4q29C6xFYZ5B2CXqSBBdcS0lPyfM9M09DoQLtHS5kf+WbpV8pBBhHDLNhXfgyVwFnhrhYzOu7xmg02DzxeF2Uw==",
       "dev": true,
+      "peer": true,
       "dependencies": {
         "@typescript-eslint/scope-manager": "5.24.0",
         "@typescript-eslint/types": "5.24.0",
@@ -809,6 +806,7 @@
       "version": "8.7.1",
       "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.7.1.tgz";,
       "integrity": 
"sha512-Xx54uLJQZ19lKygFXOWsscKUbsBZW0CPykPhVQdhIeIwrbPmJzqeASDInc8nKBnp/JT6igTs82qPXz069H8I/A==",
+      "peer": true,
       "bin": {
         "acorn": "bin/acorn"
       },
@@ -829,7 +827,6 @@
       "version": "8.2.0",
       "resolved": 
"https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.2.0.tgz";,
       "integrity": 
"sha512-k+iyHEuPgSw6SbuDpGQM+06HQUa04DZ3o+F6CSzXMvvI5KMvnaEqXe+YVe555R9nn6GPt404fos4wcgpw12SDA==",
-      "peer": true,
       "engines": {
         "node": ">=0.4.0"
       }
@@ -919,8 +916,7 @@
     "node_modules/arg": {
       "version": "4.1.3",
       "resolved": "https://registry.npmjs.org/arg/-/arg-4.1.3.tgz";,
-      "integrity": 
"sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==",
-      "peer": true
+      "integrity": 
"sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA=="
     },
     "node_modules/argle": {
       "version": "1.1.1",
@@ -1294,8 +1290,7 @@
     "node_modules/create-require": {
       "version": "1.1.1",
       "resolved": 
"https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz";,
-      "integrity": 
"sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==",
-      "peer": true
+      "integrity": 
"sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ=="
     },
     "node_modules/cross-spawn": {
       "version": "7.0.3",
@@ -1526,6 +1521,7 @@
       "resolved": "https://registry.npmjs.org/eslint/-/eslint-8.15.0.tgz";,
       "integrity": 
"sha512-GG5USZ1jhCu8HJkzGgeK8/+RGnHaNYZGrGDzUtigK3BsGESW/rs2az23XqE0WVwDxy1VRvvjSSGu5nB0Bu+6SA==",
       "dev": true,
+      "peer": true,
       "dependencies": {
         "@eslint/eslintrc": "^1.2.3",
         "@humanwhocodes/config-array": "^0.9.2",
@@ -2874,8 +2870,7 @@
     "node_modules/make-error": {
       "version": "1.3.6",
       "resolved": 
"https://registry.npmjs.org/make-error/-/make-error-1.3.6.tgz";,
-      "integrity": 
"sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==",
-      "peer": true
+      "integrity": 
"sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw=="
     },
     "node_modules/marked": {
       "version": "4.2.5",
@@ -2957,6 +2952,7 @@
       "integrity": 
"sha512-8uJR5RTC2NgpY3GrYcgpZrsEd9zKbPDpob1RezyR2upGHRQtHWofmzTMzTMSV6dru3tj5Ukt0+Vnq1qhFEEwAg==",
       "dev": true,
       "license": "MIT",
+      "peer": true,
       "dependencies": {
         "ansi-colors": "^4.1.3",
         "browser-stdout": "^1.3.1",
@@ -3890,7 +3886,6 @@
       "version": "10.7.0",
       "resolved": "https://registry.npmjs.org/ts-node/-/ts-node-10.7.0.tgz";,
       "integrity": 
"sha512-TbIGS4xgJoX2i3do417KSaep1uRAW/Lu+WAL2doDHC0D6ummjirVOXU5/7aiZotbQ5p1Zp9tP7U6cYhA0O7M8A==",
-      "peer": true,
       "dependencies": {
         "@cspotcode/source-map-support": "0.7.0",
         "@tsconfig/node10": "^1.0.7",
@@ -3933,7 +3928,6 @@
       "version": "4.0.2",
       "resolved": "https://registry.npmjs.org/diff/-/diff-4.0.2.tgz";,
       "integrity": 
"sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==",
-      "peer": true,
       "engines": {
         "node": ">=0.3.1"
       }
@@ -4069,6 +4063,7 @@
       "version": "4.7.4",
       "resolved": 
"https://registry.npmjs.org/typescript/-/typescript-4.7.4.tgz";,
       "integrity": 
"sha512-C0WQT0gezHuw6AdY1M2jxUO83Rjf0HP7Sk1DtXj6j1EwkQNZrHAg2XPWlq62oqEhYvONq5pkC2Y9oPljWToLmQ==",
+      "peer": true,
       "bin": {
         "tsc": "bin/tsc",
         "tsserver": "bin/tsserver"
@@ -4139,8 +4134,7 @@
     "node_modules/v8-compile-cache-lib": {
       "version": "3.0.0",
       "resolved": 
"https://registry.npmjs.org/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.0.tgz";,
-      "integrity": 
"sha512-mpSYqfsFvASnSn5qMiwrr4VKfumbPyONLCOPmsR3A6pTY/r0+tSaVbgPWSAIuzbk3lCTa+FForeTiO+wBQGkjA==",
-      "peer": true
+      "integrity": 
"sha512-mpSYqfsFvASnSn5qMiwrr4VKfumbPyONLCOPmsR3A6pTY/r0+tSaVbgPWSAIuzbk3lCTa+FForeTiO+wBQGkjA=="
     },
     "node_modules/vscode-oniguruma": {
       "version": "1.7.0",
@@ -4302,7 +4296,6 @@
       "version": "3.1.1",
       "resolved": "https://registry.npmjs.org/yn/-/yn-3.1.1.tgz";,
       "integrity": 
"sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==",
-      "peer": true,
       "engines": {
         "node": ">=6"
       }
@@ -4324,14 +4317,12 @@
     "@cspotcode/source-map-consumer": {
       "version": "0.8.0",
       "resolved": 
"https://registry.npmjs.org/@cspotcode/source-map-consumer/-/source-map-consumer-0.8.0.tgz";,
-      "integrity": 
"sha512-41qniHzTU8yAGbCp04ohlmSrZf8bkf/iJsl3V0dRGsQN/5GFfx+LbCSsCpp2gqrqjTVg/K6O8ycoV35JIwAzAg==",
-      "peer": true
+      "integrity": 
"sha512-41qniHzTU8yAGbCp04ohlmSrZf8bkf/iJsl3V0dRGsQN/5GFfx+LbCSsCpp2gqrqjTVg/K6O8ycoV35JIwAzAg=="
     },
     "@cspotcode/source-map-support": {
       "version": "0.7.0",
       "resolved": 
"https://registry.npmjs.org/@cspotcode/source-map-support/-/source-map-support-0.7.0.tgz";,
       "integrity": 
"sha512-X4xqRHqN8ACt2aHVe51OxeA2HjbcL4MqFqXkrmQszJ1NOUuUu5u6Vqx/0lZSVNku7velL5FC/s5uEAj1lsBMhA==",
-      "peer": true,
       "requires": {
         "@cspotcode/source-map-consumer": "0.8.0"
       }
@@ -4440,6 +4431,7 @@
       "version": "1.4.6",
       "resolved": 
"https://registry.npmjs.org/@grpc/grpc-js/-/grpc-js-1.4.6.tgz";,
       "integrity": 
"sha512-Byau4xiXfIixb1PnW30V/P9mkrZ05lknyNqiK+cVY9J5hj3gecxd/anwaUbAM8j834zg1x78NvAbwGnMfWEu7A==",
+      "peer": true,
       "requires": {
         "@grpc/proto-loader": "^0.6.4",
         "@types/node": ">=12.12.47"
@@ -4707,26 +4699,22 @@
     "@tsconfig/node10": {
       "version": "1.0.8",
       "resolved": 
"https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.8.tgz";,
-      "integrity": 
"sha512-6XFfSQmMgq0CFLY1MslA/CPUfhIL919M1rMsa5lP2P097N2Wd1sSX0tx1u4olM16fLNhtHZpRhedZJphNJqmZg==",
-      "peer": true
+      "integrity": 
"sha512-6XFfSQmMgq0CFLY1MslA/CPUfhIL919M1rMsa5lP2P097N2Wd1sSX0tx1u4olM16fLNhtHZpRhedZJphNJqmZg=="
     },
     "@tsconfig/node12": {
       "version": "1.0.9",
       "resolved": 
"https://registry.npmjs.org/@tsconfig/node12/-/node12-1.0.9.tgz";,
-      "integrity": 
"sha512-/yBMcem+fbvhSREH+s14YJi18sp7J9jpuhYByADT2rypfajMZZN4WQ6zBGgBKp53NKmqI36wFYDb3yaMPurITw==",
-      "peer": true
+      "integrity": 
"sha512-/yBMcem+fbvhSREH+s14YJi18sp7J9jpuhYByADT2rypfajMZZN4WQ6zBGgBKp53NKmqI36wFYDb3yaMPurITw=="
     },
     "@tsconfig/node14": {
       "version": "1.0.1",
       "resolved": 
"https://registry.npmjs.org/@tsconfig/node14/-/node14-1.0.1.tgz";,
-      "integrity": 
"sha512-509r2+yARFfHHE7T6Puu2jjkoycftovhXRqW328PDXTVGKihlb1P8Z9mMZH04ebyajfRY7dedfGynlrFHJUQCg==",
-      "peer": true
+      "integrity": 
"sha512-509r2+yARFfHHE7T6Puu2jjkoycftovhXRqW328PDXTVGKihlb1P8Z9mMZH04ebyajfRY7dedfGynlrFHJUQCg=="
     },
     "@tsconfig/node16": {
       "version": "1.0.2",
       "resolved": 
"https://registry.npmjs.org/@tsconfig/node16/-/node16-1.0.2.tgz";,
-      "integrity": 
"sha512-eZxlbI8GZscaGS7kkc/trHTT5xgrjH3/1n2JDwusC9iahPKWMRvRjJSAN5mCXviuTGQ/lHnhvv8Q1YTpnfz9gA==",
-      "peer": true
+      "integrity": 
"sha512-eZxlbI8GZscaGS7kkc/trHTT5xgrjH3/1n2JDwusC9iahPKWMRvRjJSAN5mCXviuTGQ/lHnhvv8Q1YTpnfz9gA=="
     },
     "@types/duplexify": {
       "version": "3.6.1",
@@ -4756,7 +4744,8 @@
     "@types/node": {
       "version": "17.0.8",
       "resolved": "https://registry.npmjs.org/@types/node/-/node-17.0.8.tgz";,
-      "integrity": 
"sha512-YofkM6fGv4gDJq78g4j0mMuGMkZVxZDgtU0JRdx6FgiJDG+0fY0GKVolOV8WqVmEhLCXkQRjwDdKyPxJp/uucg=="
+      "integrity": 
"sha512-YofkM6fGv4gDJq78g4j0mMuGMkZVxZDgtU0JRdx6FgiJDG+0fY0GKVolOV8WqVmEhLCXkQRjwDdKyPxJp/uucg==",
+      "peer": true
     },
     "@typescript-eslint/eslint-plugin": {
       "version": "5.24.0",
@@ -4780,6 +4769,7 @@
       "resolved": 
"https://registry.npmjs.org/@typescript-eslint/parser/-/parser-5.24.0.tgz";,
       "integrity": 
"sha512-4q29C6xFYZ5B2CXqSBBdcS0lPyfM9M09DoQLtHS5kf+WbpV8pBBhHDLNhXfgyVwFnhrhYzOu7xmg02DzxeF2Uw==",
       "dev": true,
+      "peer": true,
       "requires": {
         "@typescript-eslint/scope-manager": "5.24.0",
         "@typescript-eslint/types": "5.24.0",
@@ -4870,7 +4860,8 @@
     "acorn": {
       "version": "8.7.1",
       "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.7.1.tgz";,
-      "integrity": 
"sha512-Xx54uLJQZ19lKygFXOWsscKUbsBZW0CPykPhVQdhIeIwrbPmJzqeASDInc8nKBnp/JT6igTs82qPXz069H8I/A=="
+      "integrity": 
"sha512-Xx54uLJQZ19lKygFXOWsscKUbsBZW0CPykPhVQdhIeIwrbPmJzqeASDInc8nKBnp/JT6igTs82qPXz069H8I/A==",
+      "peer": true
     },
     "acorn-jsx": {
       "version": "5.3.2",
@@ -4882,8 +4873,7 @@
     "acorn-walk": {
       "version": "8.2.0",
       "resolved": 
"https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.2.0.tgz";,
-      "integrity": 
"sha512-k+iyHEuPgSw6SbuDpGQM+06HQUa04DZ3o+F6CSzXMvvI5KMvnaEqXe+YVe555R9nn6GPt404fos4wcgpw12SDA==",
-      "peer": true
+      "integrity": 
"sha512-k+iyHEuPgSw6SbuDpGQM+06HQUa04DZ3o+F6CSzXMvvI5KMvnaEqXe+YVe555R9nn6GPt404fos4wcgpw12SDA=="
     },
     "agent-base": {
       "version": "6.0.2",
@@ -4944,8 +4934,7 @@
     "arg": {
       "version": "4.1.3",
       "resolved": "https://registry.npmjs.org/arg/-/arg-4.1.3.tgz";,
-      "integrity": 
"sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==",
-      "peer": true
+      "integrity": 
"sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA=="
     },
     "argle": {
       "version": "1.1.1",
@@ -5213,8 +5202,7 @@
     "create-require": {
       "version": "1.1.1",
       "resolved": 
"https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz";,
-      "integrity": 
"sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==",
-      "peer": true
+      "integrity": 
"sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ=="
     },
     "cross-spawn": {
       "version": "7.0.3",
@@ -5376,6 +5364,7 @@
       "resolved": "https://registry.npmjs.org/eslint/-/eslint-8.15.0.tgz";,
       "integrity": 
"sha512-GG5USZ1jhCu8HJkzGgeK8/+RGnHaNYZGrGDzUtigK3BsGESW/rs2az23XqE0WVwDxy1VRvvjSSGu5nB0Bu+6SA==",
       "dev": true,
+      "peer": true,
       "requires": {
         "@eslint/eslintrc": "^1.2.3",
         "@humanwhocodes/config-array": "^0.9.2",
@@ -6402,8 +6391,7 @@
     "make-error": {
       "version": "1.3.6",
       "resolved": 
"https://registry.npmjs.org/make-error/-/make-error-1.3.6.tgz";,
-      "integrity": 
"sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==",
-      "peer": true
+      "integrity": 
"sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw=="
     },
     "marked": {
       "version": "4.2.5",
@@ -6462,6 +6450,7 @@
       "resolved": "https://registry.npmjs.org/mocha/-/mocha-11.1.0.tgz";,
       "integrity": 
"sha512-8uJR5RTC2NgpY3GrYcgpZrsEd9zKbPDpob1RezyR2upGHRQtHWofmzTMzTMSV6dru3tj5Ukt0+Vnq1qhFEEwAg==",
       "dev": true,
+      "peer": true,
       "requires": {
         "ansi-colors": "^4.1.3",
         "browser-stdout": "^1.3.1",
@@ -7113,7 +7102,6 @@
       "version": "10.7.0",
       "resolved": "https://registry.npmjs.org/ts-node/-/ts-node-10.7.0.tgz";,
       "integrity": 
"sha512-TbIGS4xgJoX2i3do417KSaep1uRAW/Lu+WAL2doDHC0D6ummjirVOXU5/7aiZotbQ5p1Zp9tP7U6cYhA0O7M8A==",
-      "peer": true,
       "requires": {
         "@cspotcode/source-map-support": "0.7.0",
         "@tsconfig/node10": "^1.0.7",
@@ -7133,8 +7121,7 @@
         "diff": {
           "version": "4.0.2",
           "resolved": "https://registry.npmjs.org/diff/-/diff-4.0.2.tgz";,
-          "integrity": 
"sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==",
-          "peer": true
+          "integrity": 
"sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A=="
         }
       }
     },
@@ -7228,7 +7215,8 @@
     "typescript": {
       "version": "4.7.4",
       "resolved": 
"https://registry.npmjs.org/typescript/-/typescript-4.7.4.tgz";,
-      "integrity": 
"sha512-C0WQT0gezHuw6AdY1M2jxUO83Rjf0HP7Sk1DtXj6j1EwkQNZrHAg2XPWlq62oqEhYvONq5pkC2Y9oPljWToLmQ=="
+      "integrity": 
"sha512-C0WQT0gezHuw6AdY1M2jxUO83Rjf0HP7Sk1DtXj6j1EwkQNZrHAg2XPWlq62oqEhYvONq5pkC2Y9oPljWToLmQ==",
+      "peer": true
     },
     "uglify-js": {
       "version": "3.15.1",
@@ -7282,8 +7270,7 @@
     "v8-compile-cache-lib": {
       "version": "3.0.0",
       "resolved": 
"https://registry.npmjs.org/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.0.tgz";,
-      "integrity": 
"sha512-mpSYqfsFvASnSn5qMiwrr4VKfumbPyONLCOPmsR3A6pTY/r0+tSaVbgPWSAIuzbk3lCTa+FForeTiO+wBQGkjA==",
-      "peer": true
+      "integrity": 
"sha512-mpSYqfsFvASnSn5qMiwrr4VKfumbPyONLCOPmsR3A6pTY/r0+tSaVbgPWSAIuzbk3lCTa+FForeTiO+wBQGkjA=="
     },
     "vscode-oniguruma": {
       "version": "1.7.0",
@@ -7408,8 +7395,7 @@
     "yn": {
       "version": "3.1.1",
       "resolved": "https://registry.npmjs.org/yn/-/yn-3.1.1.tgz";,
-      "integrity": 
"sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==",
-      "peer": true
+      "integrity": 
"sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q=="
     },
     "yocto-queue": {
       "version": "0.1.0",
diff --git a/sdks/typescript/src/apache_beam/worker/state.ts 
b/sdks/typescript/src/apache_beam/worker/state.ts
index 5a340cbb64f..5e7466a2a86 100644
--- a/sdks/typescript/src/apache_beam/worker/state.ts
+++ b/sdks/typescript/src/apache_beam/worker/state.ts
@@ -46,12 +46,110 @@ export interface StateProvider {
 }
 
 // TODO: (Advanced) Cross-bundle caching.
+/**
+ * Wrapper for cached values that tracks their weight (memory size).
+ */
+interface WeightedCacheEntry<T> {
+  entry: MaybePromise<T>;
+  weight: number;
+}
+
+// Default weight for values that cannot be sized (e.g., promises)
+const DEFAULT_WEIGHT = 64;
+
+/**
+ * Estimates the memory size of a value in bytes.
+ * Handles circular references by tracking visited objects.
+ */
+function sizeof(value: any, visited: Set<any> = new Set()): number {
+  if (value === null || value === undefined) {
+    return 8;
+  }
+
+  // Handle circular references for objects
+  if (typeof value === "object") {
+    if (visited.has(value)) {
+      return 8; // Account for reference size, not the full object again
+    }
+    visited.add(value);
+  }
+
+  const type = typeof value;
+
+  if (type === "boolean") {
+    return 4;
+  }
+  if (type === "number") {
+    return 8;
+  }
+  if (type === "string") {
+    // Each character is 2 bytes in JavaScript (UTF-16) + overhead
+    return 40 + value.length * 2;
+  }
+  if (value instanceof Uint8Array || value instanceof Buffer) {
+    return 40 + value.length;
+  }
+  if (Array.isArray(value)) {
+    let size = 40; // Array overhead
+    for (const item of value) {
+      size += sizeof(item, visited);
+    }
+    return size;
+  }
+  if (type === "object") {
+    let size = 40; // Object overhead
+    for (const key of Object.keys(value)) {
+      size += sizeof(key, visited) + sizeof(value[key], visited);
+    }
+    return size;
+  }
+
+  // Default for unknown types
+  return DEFAULT_WEIGHT;
+}
+
+// Default cache size: 100MB
+const DEFAULT_MAX_CACHE_WEIGHT = 100 * 1024 * 1024;
+
 export class CachingStateProvider implements StateProvider {
   underlying: StateProvider;
-  cache: Map<string, MaybePromise<any>> = new Map();
+  cache: Map<string, WeightedCacheEntry<any>> = new Map();
+  maxCacheWeight: number;
+  currentWeight: number = 0;
 
-  constructor(underlying: StateProvider) {
+  constructor(
+    underlying: StateProvider,
+    maxCacheWeight: number = DEFAULT_MAX_CACHE_WEIGHT,
+  ) {
     this.underlying = underlying;
+    this.maxCacheWeight = maxCacheWeight;
+  }
+
+  /**
+   * Evicts least recently used entries until the cache is under the weight 
limit.
+   * JavaScript Maps preserve insertion order, so the first entry is the 
oldest.
+   */
+  private evictIfNeeded() {
+    while (this.currentWeight > this.maxCacheWeight && this.cache.size > 0) {
+      // Get the first (oldest) entry from the map iterator
+      const firstEntry = this.cache.entries().next().value;
+      const firstKey = firstEntry[0];
+      const evictedEntry = firstEntry[1];
+      this.currentWeight -= evictedEntry.weight;
+      this.cache.delete(firstKey);
+    }
+  }
+
+  /**
+   * Moves a cache entry to the end (most recently used) by deleting and 
re-adding it.
+   * This maintains LRU order: most recently accessed items are at the end.
+   */
+  private touchCacheEntry(cacheKey: string) {
+    const value = this.cache.get(cacheKey);
+    if (value !== undefined) {
+      this.cache.delete(cacheKey);
+      this.cache.set(cacheKey, value);
+    }
   }
 
   getState<T>(stateKey: fnApi.StateKey, decode: (data: Uint8Array) => T) {
@@ -62,21 +160,44 @@ export class CachingStateProvider implements StateProvider 
{
       "base64",
     );
     if (this.cache.has(cacheKey)) {
-      return this.cache.get(cacheKey)!;
+      // Cache hit: move to end (most recently used)
+      this.touchCacheEntry(cacheKey);
+      return this.cache.get(cacheKey)!.entry;
     }
+    // Cache miss: fetch from underlying provider
     let result = this.underlying.getState(stateKey, decode);
-    const this_ = this;
     if (result.type === "promise") {
       result = {
         type: "promise",
         promise: result.promise.then((value) => {
-          this_.cache.set(cacheKey, { type: "value", value });
+          // When promise resolves, update cache with resolved value
+          const currentEntry = this.cache.get(cacheKey);
+          // Only update if the entry in the cache is still the promise we are 
resolving.
+          // This prevents a race condition where the entry is evicted and 
replaced
+          // before this promise resolves.
+          if (currentEntry?.entry === result) {
+            // Remove old weight (of the promise) from total
+            this.currentWeight -= currentEntry.weight;
+
+            const resolvedWeight = sizeof(value);
+            this.cache.set(cacheKey, {
+              entry: { type: "value", value },
+              weight: resolvedWeight,
+            });
+            this.currentWeight += resolvedWeight;
+            this.evictIfNeeded();
+          }
           return value;
         }),
       };
     }
-    // TODO: (Perf) Cache eviction.
-    this.cache.set(cacheKey, result);
+    // Calculate weight for the new entry
+    const weight =
+      result.type === "value" ? sizeof(result.value) : DEFAULT_WEIGHT;
+    // Add new entry to cache and then evict if needed
+    this.currentWeight += weight;
+    this.cache.set(cacheKey, { entry: result, weight });
+    this.evictIfNeeded();
     return result;
   }
 }
diff --git a/sdks/typescript/test/state_provider_test.ts 
b/sdks/typescript/test/state_provider_test.ts
new file mode 100644
index 00000000000..30b71e78295
--- /dev/null
+++ b/sdks/typescript/test/state_provider_test.ts
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as assert from "assert";
+import {
+  CachingStateProvider,
+  StateProvider,
+  MaybePromise,
+} from "../src/apache_beam/worker/state";
+import * as fnApi from "../src/apache_beam/proto/beam_fn_api";
+
+/**
+ * Mock StateProvider for testing that tracks call counts.
+ */
+class MockStateProvider implements StateProvider {
+  callCount: number = 0;
+  values: Map<string, any> = new Map();
+  delayMs: number = 0;
+
+  constructor(delayMs: number = 0) {
+    this.delayMs = delayMs;
+  }
+
+  setValue(key: string, value: any) {
+    this.values.set(key, value);
+  }
+
+  getState<T>(
+    stateKey: fnApi.StateKey,
+    decode: (data: Uint8Array) => T,
+  ): MaybePromise<T> {
+    this.callCount++;
+    const key = Buffer.from(fnApi.StateKey.toBinary(stateKey)).toString(
+      "base64",
+    );
+    const value = this.values.get(key);
+
+    if (this.delayMs > 0) {
+      return {
+        type: "promise",
+        promise: new Promise<T>((resolve) => {
+          setTimeout(() => resolve(value), this.delayMs);
+        }),
+      };
+    } else {
+      return { type: "value", value };
+    }
+  }
+}
+
+describe("CachingStateProvider", function () {
+  it("caches values and returns cached result on subsequent calls", function 
() {
+    const mockProvider = new MockStateProvider();
+    // Use large weight limit to ensure no eviction for this test
+    const cache = new CachingStateProvider(mockProvider, 10 * 1024);
+
+    const stateKey: fnApi.StateKey = {
+      type: {
+        oneofKind: "bagUserState",
+        bagUserState: fnApi.StateKey_BagUserState.create({
+          transformId: "test",
+          userStateId: "state1",
+          window: new Uint8Array(0),
+          key: new Uint8Array(0),
+        }),
+      },
+    };
+
+    const decode = (data: Uint8Array) => data.toString();
+
+    // Set value in mock
+    const testValue = "cached_value";
+    const key = Buffer.from(fnApi.StateKey.toBinary(stateKey)).toString(
+      "base64",
+    );
+    mockProvider.setValue(key, testValue);
+
+    // First call should hit underlying provider
+    const result1 = cache.getState(stateKey, decode);
+    assert.equal(mockProvider.callCount, 1);
+    assert.equal(result1.type, "value");
+    if (result1.type === "value") {
+      assert.equal(result1.value, testValue);
+    }
+
+    // Second call should use cache
+    const result2 = cache.getState(stateKey, decode);
+    assert.equal(mockProvider.callCount, 1); // Still 1, not 2
+    assert.equal(result2.type, "value");
+    if (result2.type === "value") {
+      assert.equal(result2.value, testValue);
+    }
+  });
+
+  it("evicts least recently used entry when cache weight exceeds limit", 
function () {
+    const mockProvider = new MockStateProvider();
+    // Each small string "valueX" is approximately 52 bytes (40 + 6*2)
+    // Set weight limit to hold approximately 3 entries
+    const cache = new CachingStateProvider(mockProvider, 180);
+
+    const decode = (data: Uint8Array) => data.toString();
+
+    // Create 4 different state keys
+    const keys: fnApi.StateKey[] = [];
+    for (let i = 0; i < 4; i++) {
+      keys.push({
+        type: {
+          oneofKind: "bagUserState",
+          bagUserState: fnApi.StateKey_BagUserState.create({
+            transformId: "test",
+            userStateId: `state${i}`,
+            window: new Uint8Array(0),
+            key: new Uint8Array(0),
+          }),
+        },
+      });
+    }
+
+    // Set values in mock
+    for (let i = 0; i < 4; i++) {
+      const key = Buffer.from(fnApi.StateKey.toBinary(keys[i])).toString(
+        "base64",
+      );
+      mockProvider.setValue(key, `value${i}`);
+    }
+
+    // Fill cache with 3 entries
+    cache.getState(keys[0], decode);
+    cache.getState(keys[1], decode);
+    cache.getState(keys[2], decode);
+    assert.equal(mockProvider.callCount, 3);
+    assert.equal(cache.cache.size, 3);
+
+    // Access keys[0] to make it most recently used
+    cache.getState(keys[0], decode);
+    assert.equal(mockProvider.callCount, 3); // Still cached
+
+    // Add 4th entry - should evict keys[1] (least recently used, not keys[0])
+    cache.getState(keys[3], decode);
+    assert.equal(mockProvider.callCount, 4);
+
+    // keys[1] should be evicted (not in cache)
+    const result1 = cache.getState(keys[1], decode);
+    assert.equal(mockProvider.callCount, 5); // Had to fetch again
+    assert.equal(result1.type, "value");
+    if (result1.type === "value") {
+      assert.equal(result1.value, "value1");
+    }
+
+    // keys[0] should still be cached (was most recently used)
+    const result0 = cache.getState(keys[0], decode);
+    assert.equal(mockProvider.callCount, 5); // Still cached, no new call
+    assert.equal(result0.type, "value");
+    if (result0.type === "value") {
+      assert.equal(result0.value, "value0");
+    }
+  });
+
+  it("handles promise-based state fetches correctly", async function () {
+    const mockProvider = new MockStateProvider(10); // 10ms delay
+    // Use large weight limit to ensure no eviction for this test
+    const cache = new CachingStateProvider(mockProvider, 10 * 1024);
+
+    const stateKey: fnApi.StateKey = {
+      type: {
+        oneofKind: "bagUserState",
+        bagUserState: fnApi.StateKey_BagUserState.create({
+          transformId: "test",
+          userStateId: "async_state",
+          window: new Uint8Array(0),
+          key: new Uint8Array(0),
+        }),
+      },
+    };
+
+    const decode = (data: Uint8Array) => data.toString();
+    const key = Buffer.from(fnApi.StateKey.toBinary(stateKey)).toString(
+      "base64",
+    );
+    mockProvider.setValue(key, "async_value");
+
+    // First call returns promise
+    const result1 = cache.getState(stateKey, decode);
+    assert.equal(result1.type, "promise");
+    assert.equal(mockProvider.callCount, 1);
+
+    // Wait for promise to resolve
+    if (result1.type === "promise") {
+      const value1 = await result1.promise;
+      assert.equal(value1, "async_value");
+
+      // Second call should return cached value (not promise)
+      const result2 = cache.getState(stateKey, decode);
+      assert.equal(result2.type, "value");
+      assert.equal(mockProvider.callCount, 1); // Still only 1 call
+      if (result2.type === "value") {
+        assert.equal(result2.value, "async_value");
+      }
+    }
+  });
+
+  it("respects custom maxCacheWeight and evicts based on memory size", 
function () {
+    const mockProvider = new MockStateProvider();
+    // Set weight limit to hold approximately 2 small string entries
+    const cache = new CachingStateProvider(mockProvider, 120);
+
+    const decode = (data: Uint8Array) => data.toString();
+
+    const keys: fnApi.StateKey[] = [];
+    for (let i = 0; i < 3; i++) {
+      keys.push({
+        type: {
+          oneofKind: "bagUserState",
+          bagUserState: fnApi.StateKey_BagUserState.create({
+            transformId: "test",
+            userStateId: `state${i}`,
+            window: new Uint8Array(0),
+            key: new Uint8Array(0),
+          }),
+        },
+      });
+      const key = Buffer.from(fnApi.StateKey.toBinary(keys[i])).toString(
+        "base64",
+      );
+      mockProvider.setValue(key, `value${i}`);
+    }
+
+    // Fill cache with 2 entries
+    cache.getState(keys[0], decode);
+    cache.getState(keys[1], decode);
+    assert.equal(cache.cache.size, 2);
+
+    // Add 3rd entry - should evict oldest to stay under weight limit
+    cache.getState(keys[2], decode);
+
+    // First entry should be evicted
+    cache.getState(keys[0], decode);
+    assert.equal(mockProvider.callCount, 4); // Had to fetch keys[0] again
+  });
+
+  it("tracks cache weight correctly", function () {
+    const mockProvider = new MockStateProvider();
+    const cache = new CachingStateProvider(mockProvider, 10 * 1024);
+
+    const decode = (data: Uint8Array) => data.toString();
+
+    const stateKey: fnApi.StateKey = {
+      type: {
+        oneofKind: "bagUserState",
+        bagUserState: fnApi.StateKey_BagUserState.create({
+          transformId: "test",
+          userStateId: "state1",
+          window: new Uint8Array(0),
+          key: new Uint8Array(0),
+        }),
+      },
+    };
+
+    const key = Buffer.from(fnApi.StateKey.toBinary(stateKey)).toString(
+      "base64",
+    );
+    mockProvider.setValue(key, "test_value");
+
+    // Cache should start with 0 weight
+    assert.equal(cache.currentWeight, 0);
+
+    // After adding an entry, weight should increase
+    cache.getState(stateKey, decode);
+    assert.ok(cache.currentWeight > 0);
+  });
+
+  it("evicts oversized item that exceeds maxCacheWeight", function () {
+    const mockProvider = new MockStateProvider();
+    // Set a very small weight limit (10 bytes)
+    const cache = new CachingStateProvider(mockProvider, 10);
+
+    const decode = (data: Uint8Array) => data.toString();
+
+    const stateKey: fnApi.StateKey = {
+      type: {
+        oneofKind: "bagUserState",
+        bagUserState: fnApi.StateKey_BagUserState.create({
+          transformId: "test",
+          userStateId: "oversized_state",
+          window: new Uint8Array(0),
+          key: new Uint8Array(0),
+        }),
+      },
+    };
+
+    const key = Buffer.from(fnApi.StateKey.toBinary(stateKey)).toString(
+      "base64",
+    );
+    // Create a large value that exceeds the cache weight limit
+    const largeValue = "this_is_a_very_large_value_that_exceeds_the_limit";
+    mockProvider.setValue(key, largeValue);
+
+    // Cache should start empty
+    assert.equal(cache.cache.size, 0);
+    assert.equal(cache.currentWeight, 0);
+
+    // Add the oversized item - it should be added and then immediately evicted
+    cache.getState(stateKey, decode);
+
+    // The cache should be empty after eviction (item was added then evicted)
+    assert.equal(cache.cache.size, 0);
+    assert.equal(cache.currentWeight, 0);
+
+    // Fetching again should hit the underlying provider since item was evicted
+    cache.getState(stateKey, decode);
+    assert.equal(mockProvider.callCount, 2);
+  });
+});


Reply via email to