This is an automated email from the ASF dual-hosted git repository.
villebro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/superset.git
The following commit(s) were added to refs/heads/master by this push:
new 19f949276c refactor(config): SIGNAL_CACHE_CONFIG →
DISTRIBUTED_COORDINATION_CONFIG (#38395)
19f949276c is described below
commit 19f949276cee4ed96e40749ba2bbb84f728f22f7
Author: Michael S. Molina <[email protected]>
AuthorDate: Wed Mar 4 14:40:21 2026 -0300
refactor(config): SIGNAL_CACHE_CONFIG → DISTRIBUTED_COORDINATION_CONFIG
(#38395)
---
UPDATING.md | 8 ++---
docs/admin_docs/configuration/cache.mdx | 16 +++++-----
docs/developer_docs/extensions/tasks.md | 4 +--
superset/commands/distributed_lock/acquire.py | 2 +-
superset/commands/distributed_lock/base.py | 6 ++--
superset/commands/distributed_lock/release.py | 2 +-
superset/config.py | 20 +++++++------
superset/distributed_lock/__init__.py | 2 +-
superset/tasks/locks.py | 4 +--
superset/tasks/manager.py | 8 ++---
superset/utils/cache_manager.py | 34 +++++++++++++---------
.../integration_tests/tasks/test_sync_join_wait.py | 4 +--
tests/unit_tests/tasks/test_handlers.py | 4 +--
tests/unit_tests/tasks/test_manager.py | 34 +++++++++++-----------
tests/unit_tests/tasks/test_timeout.py | 32 ++++++++++----------
15 files changed, 95 insertions(+), 85 deletions(-)
diff --git a/UPDATING.md b/UPDATING.md
index 7b887b1aa3..455e9a10ba 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -64,19 +64,19 @@ ORDER BY total_calls DESC;
**Security note:** Sensitive parameters (passwords, API keys, tokens) are
automatically redacted in logs as `[REDACTED]`.
-### Signal Cache Backend
+### Distributed Coordination Backend
-A new `SIGNAL_CACHE_CONFIG` configuration provides a unified Redis-based
backend for real-time coordination features in Superset. This backend enables:
+A new `DISTRIBUTED_COORDINATION_CONFIG` configuration provides a unified
Redis-based backend for real-time coordination features in Superset. This
backend enables:
- **Pub/sub messaging** for real-time event notifications between workers
- **Atomic distributed locking** using Redis SET NX EX (more performant than
database-backed locks)
- **Event-based coordination** for background task management
-The signal cache is used by the Global Task Framework (GTF) for abort
notifications and task completion signaling, and will eventually replace
`GLOBAL_ASYNC_QUERIES_CACHE_BACKEND` as the standard signaling backend.
Configuring this is recommended for Redis enabled production deployments.
+The distributed coordination is used by the Global Task Framework (GTF) for
abort notifications and task completion signaling, and will eventually replace
`GLOBAL_ASYNC_QUERIES_CACHE_BACKEND` as the standard signaling backend.
Configuring this is recommended for Redis enabled production deployments.
Example configuration in `superset_config.py`:
```python
-SIGNAL_CACHE_CONFIG = {
+DISTRIBUTED_COORDINATION_CONFIG = {
"CACHE_TYPE": "RedisCache",
"CACHE_KEY_PREFIX": "signal_",
"CACHE_REDIS_URL": "redis://localhost:6379/1",
diff --git a/docs/admin_docs/configuration/cache.mdx
b/docs/admin_docs/configuration/cache.mdx
index 4bb3748318..be1459f09f 100644
--- a/docs/admin_docs/configuration/cache.mdx
+++ b/docs/admin_docs/configuration/cache.mdx
@@ -159,9 +159,9 @@ Then on configuration:
WEBDRIVER_AUTH_FUNC = auth_driver
```
-## Signal Cache Backend
+## Distributed Coordination Backend
-Superset supports an optional signal cache (`SIGNAL_CACHE_CONFIG`) for
+Superset supports an optional distributed coordination
(`DISTRIBUTED_COORDINATION_CONFIG`) for
high-performance distributed operations. This configuration enables:
- **Distributed locking**: Moves lock operations from the metadata database to
Redis, improving
@@ -176,11 +176,11 @@ that are not available in general Flask-Caching backends.
### Configuration
-The signal cache uses Flask-Caching style configuration for consistency with
other cache
-backends. Configure `SIGNAL_CACHE_CONFIG` in `superset_config.py`:
+The distributed coordination uses Flask-Caching style configuration for
consistency with other cache
+backends. Configure `DISTRIBUTED_COORDINATION_CONFIG` in `superset_config.py`:
```python
-SIGNAL_CACHE_CONFIG = {
+DISTRIBUTED_COORDINATION_CONFIG = {
"CACHE_TYPE": "RedisCache",
"CACHE_REDIS_HOST": "localhost",
"CACHE_REDIS_PORT": 6379,
@@ -192,7 +192,7 @@ SIGNAL_CACHE_CONFIG = {
For Redis Sentinel deployments:
```python
-SIGNAL_CACHE_CONFIG = {
+DISTRIBUTED_COORDINATION_CONFIG = {
"CACHE_TYPE": "RedisSentinelCache",
"CACHE_REDIS_SENTINELS": [("sentinel1", 26379), ("sentinel2", 26379)],
"CACHE_REDIS_SENTINEL_MASTER": "mymaster",
@@ -205,7 +205,7 @@ SIGNAL_CACHE_CONFIG = {
For SSL/TLS connections:
```python
-SIGNAL_CACHE_CONFIG = {
+DISTRIBUTED_COORDINATION_CONFIG = {
"CACHE_TYPE": "RedisCache",
"CACHE_REDIS_HOST": "redis.example.com",
"CACHE_REDIS_PORT": 6380,
@@ -229,7 +229,7 @@ Individual lock acquisitions can override this value when
needed.
### Database-Only Mode
-When `SIGNAL_CACHE_CONFIG` is not configured, Superset uses database-backed
operations:
+When `DISTRIBUTED_COORDINATION_CONFIG` is not configured, Superset uses
database-backed operations:
- **Locking**: Uses the KeyValue table with periodic cleanup of expired entries
- **Event notifications**: Uses database polling instead of pub/sub
diff --git a/docs/developer_docs/extensions/tasks.md
b/docs/developer_docs/extensions/tasks.md
index d1f33aa147..47e0e7cf56 100644
--- a/docs/developer_docs/extensions/tasks.md
+++ b/docs/developer_docs/extensions/tasks.md
@@ -369,8 +369,8 @@ The prune job only removes tasks in terminal states
(`SUCCESS`, `FAILURE`, `ABOR
See `superset/config.py` for a complete example configuration.
-:::tip Signal Cache for Faster Notifications
-By default, abort detection and sync join-and-wait use database polling.
Configure `SIGNAL_CACHE_CONFIG` to enable Redis pub/sub for real-time
notifications. See [Signal Cache
Backend](/admin-docs/configuration/cache#signal-cache-backend) for
configuration details.
+:::tip Distributed Coordination for Faster Notifications
+By default, abort detection and sync join-and-wait use database polling.
Configure `DISTRIBUTED_COORDINATION_CONFIG` to enable Redis pub/sub for
real-time notifications. See [Distributed Coordination
Backend](/admin-docs/configuration/cache#signal-cache-backend) for
configuration details.
:::
## API Reference
diff --git a/superset/commands/distributed_lock/acquire.py
b/superset/commands/distributed_lock/acquire.py
index e06439a49b..c25501f792 100644
--- a/superset/commands/distributed_lock/acquire.py
+++ b/superset/commands/distributed_lock/acquire.py
@@ -46,7 +46,7 @@ class AcquireDistributedLock(BaseDistributedLockCommand):
"""
Acquire a distributed lock with automatic backend selection.
- Uses Redis SET NX EX when SIGNAL_CACHE_CONFIG is configured,
+ Uses Redis SET NX EX when DISTRIBUTED_COORDINATION_CONFIG is configured,
otherwise falls back to KeyValue table.
Raises AcquireDistributedLockFailedException if:
diff --git a/superset/commands/distributed_lock/base.py
b/superset/commands/distributed_lock/base.py
index 3317887d8c..55da69f7eb 100644
--- a/superset/commands/distributed_lock/base.py
+++ b/superset/commands/distributed_lock/base.py
@@ -41,12 +41,12 @@ def get_default_lock_ttl() -> int:
def get_redis_client() -> "redis.Redis[Any] | None":
"""
- Get Redis client from signal cache if available.
+ Get Redis client from distributed coordination if available.
- Returns None if SIGNAL_CACHE_CONFIG is not configured,
+ Returns None if DISTRIBUTED_COORDINATION_CONFIG is not configured,
allowing fallback to database-backed locking.
"""
- backend = cache_manager.signal_cache
+ backend = cache_manager.distributed_coordination
return backend._cache if backend else None
diff --git a/superset/commands/distributed_lock/release.py
b/superset/commands/distributed_lock/release.py
index 6d98f82aa4..14f9deb4df 100644
--- a/superset/commands/distributed_lock/release.py
+++ b/superset/commands/distributed_lock/release.py
@@ -40,7 +40,7 @@ class ReleaseDistributedLock(BaseDistributedLockCommand):
"""
Release a distributed lock with automatic backend selection.
- Uses Redis DELETE when SIGNAL_CACHE_CONFIG is configured,
+ Uses Redis DELETE when DISTRIBUTED_COORDINATION_CONFIG is configured,
otherwise deletes from KeyValue table.
"""
diff --git a/superset/config.py b/superset/config.py
index dfeafb3b26..30f0f80122 100644
--- a/superset/config.py
+++ b/superset/config.py
@@ -2485,28 +2485,30 @@ TASK_ABORT_POLLING_DEFAULT_INTERVAL = 10
TASK_PROGRESS_UPDATE_THROTTLE_INTERVAL = 2 # seconds
# ---------------------------------------------------
-# Signal Cache Configuration
+# Distributed Coordination Configuration
# ---------------------------------------------------
-# Shared Redis/Valkey configuration for signaling features that require
-# Redis-specific primitives (pub/sub messaging, distributed locks).
+# Shared Redis/Valkey backend for distributed coordination primitives.
#
# Uses Flask-Caching style configuration for consistency with other cache
backends.
# Set CACHE_TYPE to 'RedisCache' for standard Redis or 'RedisSentinelCache' for
# Sentinel.
#
-# These features cannot use generic cache backends because they rely on:
+# These features require Redis primitives unavailable in generic cache
backends:
# - Pub/Sub: Real-time message broadcasting between workers
# - SET NX EX: Atomic lock acquisition with automatic expiration
+# - Streams: Persistent ordered event logs (future)
#
# When configured, enables:
# - Real-time abort/completion notifications for GTF tasks (vs database
polling)
# - Redis-based distributed locking (vs KeyValueDAO-backed DistributedLock)
#
-# Future: This cache will also be used by Global Async Queries, consolidating
-# GLOBAL_ASYNC_QUERIES_CACHE_BACKEND into this unified configuration.
+# Future: This backend will power a higher-level coordination service exposing
+# standardized interfaces for distributed locks, pub/sub, and streams —
consolidating
+# all advanced Redis primitives under a single connection. Global Async Queries
+# (GLOBAL_ASYNC_QUERIES_CACHE_BACKEND) will also be migrated to this
configuration.
#
# Example with standard Redis:
-# SIGNAL_CACHE_CONFIG: CacheConfig = {
+# DISTRIBUTED_COORDINATION_CONFIG: CacheConfig = {
# "CACHE_TYPE": "RedisCache",
# "CACHE_REDIS_HOST": "localhost",
# "CACHE_REDIS_PORT": 6379,
@@ -2515,7 +2517,7 @@ TASK_PROGRESS_UPDATE_THROTTLE_INTERVAL = 2 # seconds
# }
#
# Example with Redis Sentinel:
-# SIGNAL_CACHE_CONFIG: CacheConfig = {
+# DISTRIBUTED_COORDINATION_CONFIG: CacheConfig = {
# "CACHE_TYPE": "RedisSentinelCache",
# "CACHE_REDIS_SENTINELS": [("sentinel1", 26379), ("sentinel2", 26379)],
# "CACHE_REDIS_SENTINEL_MASTER": "mymaster",
@@ -2523,7 +2525,7 @@ TASK_PROGRESS_UPDATE_THROTTLE_INTERVAL = 2 # seconds
# "CACHE_REDIS_DB": 0,
# "CACHE_REDIS_PASSWORD": "",
# }
-SIGNAL_CACHE_CONFIG: CacheConfig | None = None
+DISTRIBUTED_COORDINATION_CONFIG: CacheConfig | None = None
# Default lock TTL (time-to-live) in seconds for distributed locks.
# Can be overridden per-call via the `ttl_seconds` parameter.
diff --git a/superset/distributed_lock/__init__.py
b/superset/distributed_lock/__init__.py
index 69ec3d8253..ab00143ba3 100644
--- a/superset/distributed_lock/__init__.py
+++ b/superset/distributed_lock/__init__.py
@@ -34,7 +34,7 @@ def DistributedLock( # noqa: N802
"""
Distributed lock for coordinating operations across workers.
- Automatically uses Redis-based locking when SIGNAL_CACHE_CONFIG is
+ Automatically uses Redis-based locking when
DISTRIBUTED_COORDINATION_CONFIG is
configured, falling back to database-backed locking otherwise.
Redis locking uses SET NX EX for atomic acquisition with automatic
expiration.
diff --git a/superset/tasks/locks.py b/superset/tasks/locks.py
index f6af3df13c..73875925e3 100644
--- a/superset/tasks/locks.py
+++ b/superset/tasks/locks.py
@@ -22,7 +22,7 @@ conditions during concurrent task creation, subscription, and
cancellation.
The lock key uses the task's dedup_key, ensuring all operations on the same
logical task serialize correctly.
-When SIGNAL_CACHE_CONFIG is configured, uses Redis SET NX EX for
+When DISTRIBUTED_COORDINATION_CONFIG is configured, uses Redis SET NX EX for
efficient single-command locking. Otherwise falls back to database-backed
locking via DistributedLock.
"""
@@ -55,7 +55,7 @@ def task_lock(dedup_key: str) -> Iterator[None]:
- Subscribe racing with cancel
- Multiple concurrent cancel requests
- When SIGNAL_CACHE_CONFIG is configured, uses Redis SET NX EX
+ When DISTRIBUTED_COORDINATION_CONFIG is configured, uses Redis SET NX EX
for efficient single-command locking. Otherwise falls back to
database-backed DistributedLock.
diff --git a/superset/tasks/manager.py b/superset/tasks/manager.py
index 21b28c7d42..def7bf1e0c 100644
--- a/superset/tasks/manager.py
+++ b/superset/tasks/manager.py
@@ -103,7 +103,7 @@ class TaskManager:
3. Handling deduplication (returning existing active task if duplicate)
4. Managing real-time abort notifications (optional)
- Redis pub/sub is opt-in via SIGNAL_CACHE_CONFIG configuration. When not
+ Redis pub/sub is opt-in via DISTRIBUTED_COORDINATION_CONFIG configuration.
When not
configured, tasks use database polling for abort detection.
"""
@@ -134,11 +134,11 @@ class TaskManager:
@classmethod
def _get_cache(cls) -> RedisCacheBackend | RedisSentinelCacheBackend |
None:
"""
- Get the signal cache backend.
+ Get the distributed coordination backend.
- :returns: The signal cache backend, or None if not configured
+ :returns: The distributed coordination backend, or None if not
configured
"""
- return cache_manager.signal_cache
+ return cache_manager.distributed_coordination
@classmethod
def is_pubsub_available(cls) -> bool:
diff --git a/superset/utils/cache_manager.py b/superset/utils/cache_manager.py
index 48ff0e11cd..7ab54dead1 100644
--- a/superset/utils/cache_manager.py
+++ b/superset/utils/cache_manager.py
@@ -193,7 +193,9 @@ class CacheManager:
self._thumbnail_cache = SupersetCache()
self._filter_state_cache = SupersetCache()
self._explore_form_data_cache = ExploreFormDataCache()
- self._signal_cache: RedisCacheBackend | RedisSentinelCacheBackend |
None = None
+ self._distributed_coordination: (
+ RedisCacheBackend | RedisSentinelCacheBackend | None
+ ) = None
@staticmethod
def _init_cache(
@@ -235,27 +237,29 @@ class CacheManager:
"EXPLORE_FORM_DATA_CACHE_CONFIG",
required=True,
)
- self._init_signal_cache(app)
+ self._init_distributed_coordination(app)
- def _init_signal_cache(self, app: Flask) -> None:
- """Initialize the signal cache for pub/sub and distributed locks."""
+ def _init_distributed_coordination(self, app: Flask) -> None:
+ """Initialize the distributed coordination backend (pub/sub, locks,
streams)."""
from superset.async_events.cache_backend import (
RedisCacheBackend,
RedisSentinelCacheBackend,
)
- config = app.config.get("SIGNAL_CACHE_CONFIG")
+ config = app.config.get("DISTRIBUTED_COORDINATION_CONFIG")
if not config:
return
cache_type = config.get("CACHE_TYPE")
if cache_type == "RedisCache":
- self._signal_cache = RedisCacheBackend.from_config(config)
+ self._distributed_coordination =
RedisCacheBackend.from_config(config)
elif cache_type == "RedisSentinelCache":
- self._signal_cache = RedisSentinelCacheBackend.from_config(config)
+ self._distributed_coordination =
RedisSentinelCacheBackend.from_config(
+ config
+ )
else:
logger.warning(
- "Unsupported CACHE_TYPE for SIGNAL_CACHE_CONFIG: %s. "
+ "Unsupported CACHE_TYPE for DISTRIBUTED_COORDINATION_CONFIG:
%s. "
"Use 'RedisCache' or 'RedisSentinelCache'.",
cache_type,
)
@@ -281,13 +285,17 @@ class CacheManager:
return self._explore_form_data_cache
@property
- def signal_cache(
+ def distributed_coordination(
self,
) -> RedisCacheBackend | RedisSentinelCacheBackend | None:
"""
- Return the signal cache backend.
+ Return the distributed coordination backend for Redis-specific
primitives.
+
+ This backend is the foundation for distributed coordination features
including
+ pub/sub messaging, atomic distributed locking, and streams. A
higher-level
+ service will eventually expose standardized interfaces on top of this
backend.
- Used for signaling features that require Redis-specific primitives:
+ Coordination primitives currently backed by this:
- Pub/Sub messaging for real-time abort/completion notifications
- SET NX EX for atomic distributed lock acquisition
@@ -296,6 +304,6 @@ class CacheManager:
- `.key_prefix`: Configured key prefix (from CACHE_KEY_PREFIX)
- `.default_timeout`: Default timeout in seconds (from
CACHE_DEFAULT_TIMEOUT)
- Returns None if SIGNAL_CACHE_CONFIG is not configured.
+ Returns None if DISTRIBUTED_COORDINATION_CONFIG is not configured.
"""
- return self._signal_cache
+ return self._distributed_coordination
diff --git a/tests/integration_tests/tasks/test_sync_join_wait.py
b/tests/integration_tests/tasks/test_sync_join_wait.py
index 88702b6fce..ff00958cf5 100644
--- a/tests/integration_tests/tasks/test_sync_join_wait.py
+++ b/tests/integration_tests/tasks/test_sync_join_wait.py
@@ -89,9 +89,9 @@ def test_wait_for_completion_timeout(app_context, login_as,
get_user) -> None:
assert task.user_id == admin.id
try:
- # Force polling mode by mocking signal_cache as None
+ # Force polling mode by mocking distributed_coordination as None
with patch("superset.tasks.manager.cache_manager") as
mock_cache_manager:
- mock_cache_manager.signal_cache = None
+ mock_cache_manager.distributed_coordination = None
with pytest.raises(TimeoutError):
TaskManager.wait_for_completion(
task.uuid,
diff --git a/tests/unit_tests/tasks/test_handlers.py
b/tests/unit_tests/tasks/test_handlers.py
index 1da4b4da93..7c29a961a8 100644
--- a/tests/unit_tests/tasks/test_handlers.py
+++ b/tests/unit_tests/tasks/test_handlers.py
@@ -82,8 +82,8 @@ def task_context(mock_task, mock_task_dao,
mock_update_command, mock_flask_app):
patch("superset.tasks.context.current_app") as mock_current_app,
patch("superset.tasks.manager.cache_manager") as mock_cache_manager,
):
- # Disable Redis by making signal_cache return None
- mock_cache_manager.signal_cache = None
+ # Disable Redis by making distributed_coordination return None
+ mock_cache_manager.distributed_coordination = None
# Configure current_app mock
mock_current_app.config = mock_flask_app.config
diff --git a/tests/unit_tests/tasks/test_manager.py
b/tests/unit_tests/tasks/test_manager.py
index 4fc77c2e08..9f10c4da59 100644
--- a/tests/unit_tests/tasks/test_manager.py
+++ b/tests/unit_tests/tasks/test_manager.py
@@ -123,13 +123,13 @@ class TestTaskManagerPubSub:
@patch("superset.tasks.manager.cache_manager")
def test_is_pubsub_available_no_redis(self, mock_cache_manager):
"""Test is_pubsub_available returns False when Redis not configured"""
- mock_cache_manager.signal_cache = None
+ mock_cache_manager.distributed_coordination = None
assert TaskManager.is_pubsub_available() is False
@patch("superset.tasks.manager.cache_manager")
def test_is_pubsub_available_with_redis(self, mock_cache_manager):
"""Test is_pubsub_available returns True when Redis is configured"""
- mock_cache_manager.signal_cache = MagicMock()
+ mock_cache_manager.distributed_coordination = MagicMock()
assert TaskManager.is_pubsub_available() is True
def test_get_abort_channel(self):
@@ -148,7 +148,7 @@ class TestTaskManagerPubSub:
@patch("superset.tasks.manager.cache_manager")
def test_publish_abort_no_redis(self, mock_cache_manager):
"""Test publish_abort returns False when Redis not available"""
- mock_cache_manager.signal_cache = None
+ mock_cache_manager.distributed_coordination = None
result = TaskManager.publish_abort("test-uuid")
assert result is False
@@ -157,7 +157,7 @@ class TestTaskManagerPubSub:
"""Test publish_abort publishes message successfully"""
mock_redis = MagicMock()
mock_redis.publish.return_value = 1 # One subscriber
- mock_cache_manager.signal_cache = mock_redis
+ mock_cache_manager.distributed_coordination = mock_redis
result = TaskManager.publish_abort("test-uuid")
@@ -169,7 +169,7 @@ class TestTaskManagerPubSub:
"""Test publish_abort handles Redis errors gracefully"""
mock_redis = MagicMock()
mock_redis.publish.side_effect = redis.RedisError("Connection lost")
- mock_cache_manager.signal_cache = mock_redis
+ mock_cache_manager.distributed_coordination = mock_redis
result = TaskManager.publish_abort("test-uuid")
@@ -194,7 +194,7 @@ class TestTaskManagerListenForAbort:
@patch("superset.tasks.manager.cache_manager")
def test_listen_for_abort_no_redis_uses_polling(self, mock_cache_manager):
"""Test listen_for_abort falls back to polling when Redis
unavailable"""
- mock_cache_manager.signal_cache = None
+ mock_cache_manager.distributed_coordination = None
callback = MagicMock()
with patch.object(TaskManager, "_poll_for_abort", return_value=None):
@@ -218,7 +218,7 @@ class TestTaskManagerListenForAbort:
mock_redis = MagicMock()
mock_pubsub = MagicMock()
mock_redis.pubsub.return_value = mock_pubsub
- mock_cache_manager.signal_cache = mock_redis
+ mock_cache_manager.distributed_coordination = mock_redis
callback = MagicMock()
@@ -245,7 +245,7 @@ class TestTaskManagerListenForAbort:
mock_redis = MagicMock()
mock_redis.pubsub.side_effect = redis.RedisError("Connection failed")
- mock_cache_manager.signal_cache = mock_redis
+ mock_cache_manager.distributed_coordination = mock_redis
callback = MagicMock()
@@ -290,7 +290,7 @@ class TestTaskManagerCompletion:
@patch("superset.tasks.manager.cache_manager")
def test_publish_completion_no_redis(self, mock_cache_manager):
"""Test publish_completion returns False when Redis not available"""
- mock_cache_manager.signal_cache = None
+ mock_cache_manager.distributed_coordination = None
result = TaskManager.publish_completion("test-uuid", "success")
assert result is False
@@ -299,7 +299,7 @@ class TestTaskManagerCompletion:
"""Test publish_completion publishes message successfully"""
mock_redis = MagicMock()
mock_redis.publish.return_value = 1 # One subscriber
- mock_cache_manager.signal_cache = mock_redis
+ mock_cache_manager.distributed_coordination = mock_redis
result = TaskManager.publish_completion("test-uuid", "success")
@@ -311,7 +311,7 @@ class TestTaskManagerCompletion:
"""Test publish_completion handles Redis errors gracefully"""
mock_redis = MagicMock()
mock_redis.publish.side_effect = redis.RedisError("Connection lost")
- mock_cache_manager.signal_cache = mock_redis
+ mock_cache_manager.distributed_coordination = mock_redis
result = TaskManager.publish_completion("test-uuid", "success")
@@ -323,7 +323,7 @@ class TestTaskManagerCompletion:
"""Test wait_for_completion raises ValueError for missing task"""
import pytest
- mock_cache_manager.signal_cache = None
+ mock_cache_manager.distributed_coordination = None
mock_dao.find_one_or_none.return_value = None
with pytest.raises(ValueError, match="not found"):
@@ -333,7 +333,7 @@ class TestTaskManagerCompletion:
@patch("superset.daos.tasks.TaskDAO")
def test_wait_for_completion_already_complete(self, mock_dao,
mock_cache_manager):
"""Test wait_for_completion returns immediately for terminal state"""
- mock_cache_manager.signal_cache = None
+ mock_cache_manager.distributed_coordination = None
mock_task = MagicMock()
mock_task.uuid = "test-uuid"
mock_task.status = "success"
@@ -351,7 +351,7 @@ class TestTaskManagerCompletion:
"""Test wait_for_completion raises TimeoutError when timeout expires"""
import pytest
- mock_cache_manager.signal_cache = None
+ mock_cache_manager.distributed_coordination = None
mock_task = MagicMock()
mock_task.uuid = "test-uuid"
mock_task.status = "in_progress" # Never completes
@@ -364,7 +364,7 @@ class TestTaskManagerCompletion:
@patch("superset.daos.tasks.TaskDAO")
def test_wait_for_completion_polling_success(self, mock_dao,
mock_cache_manager):
"""Test wait_for_completion returns when task completes via polling"""
- mock_cache_manager.signal_cache = None
+ mock_cache_manager.distributed_coordination = None
mock_task_pending = MagicMock()
mock_task_pending.uuid = "test-uuid"
mock_task_pending.status = "pending"
@@ -414,7 +414,7 @@ class TestTaskManagerCompletion:
"data": "success",
}
mock_redis.pubsub.return_value = mock_pubsub
- mock_cache_manager.signal_cache = mock_redis
+ mock_cache_manager.distributed_coordination = mock_redis
result = TaskManager.wait_for_completion(
"test-uuid",
@@ -446,7 +446,7 @@ class TestTaskManagerCompletion:
# Set up mock Redis that fails
mock_redis = MagicMock()
mock_redis.pubsub.side_effect = redis.RedisError("Connection failed")
- mock_cache_manager.signal_cache = mock_redis
+ mock_cache_manager.distributed_coordination = mock_redis
# With fail-fast behavior, Redis error is raised instead of falling
back
with pytest.raises(redis.RedisError, match="Connection failed"):
diff --git a/tests/unit_tests/tasks/test_timeout.py
b/tests/unit_tests/tasks/test_timeout.py
index ef8d5f9d76..002166fc37 100644
--- a/tests/unit_tests/tasks/test_timeout.py
+++ b/tests/unit_tests/tasks/test_timeout.py
@@ -89,8 +89,8 @@ def task_context_for_timeout(mock_flask_app,
mock_task_abortable):
patch("superset.daos.tasks.TaskDAO") as mock_dao,
patch("superset.tasks.manager.cache_manager") as mock_cache_manager,
):
- # Disable Redis by making signal_cache return None
- mock_cache_manager.signal_cache = None
+ # Disable Redis by making distributed_coordination return None
+ mock_cache_manager.distributed_coordination = None
# Configure current_app mock
mock_current_app.config = mock_flask_app.config
@@ -277,8 +277,8 @@ class TestTimeoutTrigger:
) as mock_update_cmd,
patch("superset.tasks.manager.cache_manager") as
mock_cache_manager,
):
- # Disable Redis by making signal_cache return None
- mock_cache_manager.signal_cache = None
+ # Disable Redis by making distributed_coordination return None
+ mock_cache_manager.distributed_coordination = None
mock_current_app.config = mock_flask_app.config
mock_current_app._get_current_object.return_value = mock_flask_app
@@ -323,8 +323,8 @@ class TestTimeoutTrigger:
patch("superset.tasks.context.logger") as mock_logger,
patch("superset.tasks.manager.cache_manager") as
mock_cache_manager,
):
- # Disable Redis by making signal_cache return None
- mock_cache_manager.signal_cache = None
+ # Disable Redis by making distributed_coordination return None
+ mock_cache_manager.distributed_coordination = None
mock_current_app.config = mock_flask_app.config
mock_current_app._get_current_object.return_value = mock_flask_app
@@ -363,8 +363,8 @@ class TestTimeoutTrigger:
patch("superset.commands.tasks.update.UpdateTaskCommand"),
patch("superset.tasks.manager.cache_manager") as
mock_cache_manager,
):
- # Disable Redis by making signal_cache return None
- mock_cache_manager.signal_cache = None
+ # Disable Redis by making distributed_coordination return None
+ mock_cache_manager.distributed_coordination = None
mock_current_app.config = mock_flask_app.config
mock_current_app._get_current_object.return_value = mock_flask_app
@@ -469,8 +469,8 @@ class TestTimeoutTerminalState:
patch("superset.commands.tasks.update.UpdateTaskCommand"),
patch("superset.tasks.manager.cache_manager") as
mock_cache_manager,
):
- # Disable Redis by making signal_cache return None
- mock_cache_manager.signal_cache = None
+ # Disable Redis by making distributed_coordination return None
+ mock_cache_manager.distributed_coordination = None
mock_current_app.config = mock_flask_app.config
mock_current_app._get_current_object.return_value = mock_flask_app
@@ -510,8 +510,8 @@ class TestTimeoutTerminalState:
patch("superset.commands.tasks.update.UpdateTaskCommand"),
patch("superset.tasks.manager.cache_manager") as
mock_cache_manager,
):
- # Disable Redis by making signal_cache return None
- mock_cache_manager.signal_cache = None
+ # Disable Redis by making distributed_coordination return None
+ mock_cache_manager.distributed_coordination = None
mock_current_app.config = mock_flask_app.config
mock_current_app._get_current_object.return_value = mock_flask_app
@@ -547,8 +547,8 @@ class TestTimeoutTerminalState:
patch("superset.commands.tasks.update.UpdateTaskCommand"),
patch("superset.tasks.manager.cache_manager") as
mock_cache_manager,
):
- # Disable Redis by making signal_cache return None
- mock_cache_manager.signal_cache = None
+ # Disable Redis by making distributed_coordination return None
+ mock_cache_manager.distributed_coordination = None
mock_current_app.config = mock_flask_app.config
mock_current_app._get_current_object.return_value = mock_flask_app
@@ -584,8 +584,8 @@ class TestTimeoutTerminalState:
patch("superset.commands.tasks.update.UpdateTaskCommand"),
patch("superset.tasks.manager.cache_manager") as
mock_cache_manager,
):
- # Disable Redis by making signal_cache return None
- mock_cache_manager.signal_cache = None
+ # Disable Redis by making distributed_coordination return None
+ mock_cache_manager.distributed_coordination = None
mock_current_app.config = mock_flask_app.config
mock_current_app._get_current_object.return_value = mock_flask_app