This is an automated email from the ASF dual-hosted git repository.
amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 05f00d654ae Cleaning up asset_state references in task sdk (#68429)
05f00d654ae is described below
commit 05f00d654ae1dad35c3ab04bd6e4ec2ee02a4745
Author: Amogh Desai <[email protected]>
AuthorDate: Mon Jun 15 12:11:35 2026 +0530
Cleaning up asset_state references in task sdk (#68429)
---
.../task_sdk/execution_time/test_task_runner.py | 22 ++++++++++++----------
1 file changed, 12 insertions(+), 10 deletions(-)
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 2e87b4139d9..8c1312833e8 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -5678,7 +5678,7 @@ class TestTaskInstanceStateOperations:
return AssetResult(name=actual.uri, uri=actual.uri, group="asset")
return OKResponse(ok=True)
- def test_asset_state_get_and_set(self, create_runtime_ti,
mock_supervisor_comms):
+ def test_asset_state_store_get_and_set(self, create_runtime_ti,
mock_supervisor_comms):
watched = Asset(name="my_asset", uri="s3://bucket/data")
class WatcherOperator(BaseOperator):
@@ -5697,7 +5697,9 @@ class TestTaskInstanceStateOperations:
)
mock_supervisor_comms.send.assert_any_call(GetAssetStateStoreByName(name="my_asset",
key="watermark"))
- def test_asset_state_get_returns_default_when_key_missing(self,
create_runtime_ti, mock_supervisor_comms):
+ def test_asset_state_store_get_returns_default_when_key_missing(
+ self, create_runtime_ti, mock_supervisor_comms
+ ):
watched = Asset(name="my_asset", uri="s3://bucket/data")
captured = {}
@@ -5718,7 +5720,7 @@ class TestTaskInstanceStateOperations:
assert captured["result"] == "2026-01-01T00:00:00+00:00"
- def test_asset_state_delete(self, create_runtime_ti,
mock_supervisor_comms):
+ def test_asset_state_store_delete(self, create_runtime_ti,
mock_supervisor_comms):
watched = Asset(name="my_asset", uri="s3://bucket/data")
class WatcherOperator(BaseOperator):
@@ -5735,7 +5737,7 @@ class TestTaskInstanceStateOperations:
DeleteAssetStateStoreByName(name="my_asset", key="watermark")
)
- def test_asset_state_clear(self, create_runtime_ti, mock_supervisor_comms):
+ def test_asset_state_store_clear(self, create_runtime_ti,
mock_supervisor_comms):
watched = Asset(name="my_asset", uri="s3://bucket/data")
class WatcherOperator(BaseOperator):
@@ -5750,7 +5752,7 @@ class TestTaskInstanceStateOperations:
mock_supervisor_comms.send.assert_any_call(ClearAssetStateStoreByName(name="my_asset"))
- def test_asset_state_uri_ref_inlet(self, create_runtime_ti,
mock_supervisor_comms):
+ def test_asset_state_store_uri_ref_inlet(self, create_runtime_ti,
mock_supervisor_comms):
watched = AssetUriRef(uri="s3://bucket/data")
class WatcherOperator(BaseOperator):
@@ -5771,7 +5773,7 @@ class TestTaskInstanceStateOperations:
GetAssetStateStoreByUri(uri="s3://bucket/data", key="watermark")
)
- def test_asset_state_alias_as_inlet(self, create_runtime_ti,
mock_supervisor_comms):
+ def test_asset_state_store_alias_as_inlet(self, create_runtime_ti,
mock_supervisor_comms):
alias = AssetAlias(name="my_alias")
resolved = Asset(name="resolved_asset", uri="s3://bucket/resolved")
@@ -5796,7 +5798,7 @@ class TestTaskInstanceStateOperations:
SetAssetStateStoreByName(name="resolved_asset", key="watermark",
value="2026-05-01")
)
- def test_asset_state_alias_inlet_no_resolved_assets(self,
create_runtime_ti, mock_supervisor_comms):
+ def test_asset_state_store_alias_inlet_no_resolved_assets(self,
create_runtime_ti, mock_supervisor_comms):
alias = AssetAlias(name="empty_alias")
class WatcherOperator(BaseOperator):
@@ -5815,7 +5817,7 @@ class TestTaskInstanceStateOperations:
run(runtime_ti, context=runtime_ti.get_template_context(),
log=mock.MagicMock())
- def test_asset_state_keyed_access_single_inlet(self, create_runtime_ti,
mock_supervisor_comms):
+ def test_asset_state_store_keyed_access_single_inlet(self,
create_runtime_ti, mock_supervisor_comms):
watched = Asset(name="my_asset", uri="s3://bucket/data")
class WatcherOperator(BaseOperator):
@@ -5833,7 +5835,7 @@ class TestTaskInstanceStateOperations:
SetAssetStateStoreByName(name="my_asset", key="watermark",
value="2026-05-01")
)
- def test_asset_state_multi_inlet(self, create_runtime_ti,
mock_supervisor_comms):
+ def test_asset_state_store_multi_inlet(self, create_runtime_ti,
mock_supervisor_comms):
asset_a = Asset(name="asset_a", uri="s3://bucket/a")
asset_b = Asset(name="asset_b", uri="s3://bucket/b")
@@ -5855,7 +5857,7 @@ class TestTaskInstanceStateOperations:
SetAssetStateStoreByName(name="asset_b", key="watermark_b",
value="2026-05-02")
)
- def test_asset_state_set_sends_reference_via_custom_backend(
+ def test_asset_state_store_set_sends_reference_via_custom_backend(
self, create_runtime_ti, mock_supervisor_comms
):
"""When a worker backend is configured, asset state set() sends a
reference, not the actual value."""