Re: [PR] Feature: Support bytes data values in xcom backend [airflow]
jolin1337 closed pull request #61777: Feature: Support bytes data values in xcom backend URL: https://github.com/apache/airflow/pull/61777 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Feature: Support bytes data values in xcom backend [airflow]
potiuk commented on PR #61777: URL: https://github.com/apache/airflow/pull/61777#issuecomment-4043193150 @jolin1337 This PR has been converted to **draft** because it does not yet meet our [Pull Request quality criteria](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-quality-criteria). **Issues found:** - :x: **Pre-commit / static checks**: Failing: CI image checks / Static checks. Run `prek run --from-ref main` locally to find and fix issues. See [Pre-commit / static checks docs](https://github.com/apache/airflow/blob/main/contributing-docs/08_static_code_checks.rst). - :x: **mypy (type checking)**: Failing: CI image checks / MyPy checks (mypy-providers). Run `prek --stage manual mypy-providers --all-files` locally to reproduce. You need `breeze ci-image build --python 3.10` for Docker-based mypy. See [mypy (type checking) docs](https://github.com/apache/airflow/blob/main/contributing-docs/08_static_code_checks.rst#mypy-checks). - :x: **Provider tests**: Failing: provider distributions tests / Compat 2.11.0:P3.10:, provider distributions tests / Compat 3.0.6:P3.10:, provider distributions tests / Compat 3.1.7:P3.10:, Postgres tests: providers / DB-prov:Postgres:14:3.10:common.compat,common.io,openl, MySQL tests: providers / DB-prov:MySQL:8.0:3.10:common.compat,common.io,openl (+2 more). Run provider tests with `breeze run pytest -xvs`. See [Provider tests docs](https://github.com/apache/airflow/blob/main/contributing-docs/12_provider_distributions.rst). > **Note:** Your branch is **788 commits behind `main`**. Some check failures may be caused by changes in the base branch rather than by your PR. Please rebase your branch and push again to get up-to-date CI results. **What to do next:** - The comment informs you what you need to do. - Fix each issue, then mark the PR as "Ready for review" in the GitHub UI - but only after making sure that all the issues are fixed. - Maintainers will then proceed with a normal review. Converting a PR to draft is **not** a rejection โ it is an invitation to bring the PR up to the project's standards so that maintainer review time is spent productively. If you have questions, feel free to ask on the [Airflow Slack](https://s.apache.org/airflow-slack). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Feature: Support bytes data values in xcom backend [airflow]
Nataneljpwd commented on PR #61777: URL: https://github.com/apache/airflow/pull/61777#issuecomment-3954940327 Hello, I think it is a good idea to start a mailing list discussion on this topic, as GitHub discussions tend to get less attention -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Feature: Support bytes data values in xcom backend [airflow]
jolin1337 commented on code in PR #61777:
URL: https://github.com/apache/airflow/pull/61777#discussion_r2803104631
##
providers/common/io/src/airflow/providers/common/io/xcom/backend.py:
##
@@ -117,9 +117,13 @@ def serialize_value( # type: ignore[override]
run_id: str | None = None,
map_index: int | None = None,
) -> bytes | str:
-# We will use this serialized value to write to the object store.
-s_val = json.dumps(value, cls=XComEncoder)
-s_val_encoded = s_val.encode("utf-8")
+if isinstance(value, bytes):
+ # Store raw bytes as-is
+s_val_encoded = value
Review Comment:
Discussion can continue here:
https://github.com/apache/airflow/discussions/61843
Just want to clarify as stated in the discussion that I would rather just
support raw bytes and remove json support but for compatibility reasons keep
JSON format. Or possibly parameterize it somehow so that the DAG or task can
override this behaviour for some serializer of their choice.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Feature: Support bytes data values in xcom backend [airflow]
jolin1337 commented on code in PR #61777:
URL: https://github.com/apache/airflow/pull/61777#discussion_r2803010076
##
providers/common/io/src/airflow/providers/common/io/xcom/backend.py:
##
@@ -117,9 +117,13 @@ def serialize_value( # type: ignore[override]
run_id: str | None = None,
map_index: int | None = None,
) -> bytes | str:
-# We will use this serialized value to write to the object store.
-s_val = json.dumps(value, cls=XComEncoder)
-s_val_encoded = s_val.encode("utf-8")
+if isinstance(value, bytes):
+ # Store raw bytes as-is
+s_val_encoded = value
Review Comment:
Fair enough, I will start a discussion :)
You point out that we could do this manually in each task but what is the
point of/use case of this xcom backend in that case? I think it is pretty
convenient that it exists both as a fallback that we do not accidentally store
big files in S3 but also as a reliable feature to standardize the bucket paths
in s3 for all dags that are running in the airflow instance.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Feature: Support bytes data values in xcom backend [airflow]
Nataneljpwd commented on code in PR #61777:
URL: https://github.com/apache/airflow/pull/61777#discussion_r2802926706
##
providers/common/io/src/airflow/providers/common/io/xcom/backend.py:
##
@@ -117,9 +117,13 @@ def serialize_value( # type: ignore[override]
run_id: str | None = None,
map_index: int | None = None,
) -> bytes | str:
-# We will use this serialized value to write to the object store.
-s_val = json.dumps(value, cls=XComEncoder)
-s_val_encoded = s_val.encode("utf-8")
+if isinstance(value, bytes):
+ # Store raw bytes as-is
+s_val_encoded = value
Review Comment:
I just think that it is a very specific use case, if you are talking about
video or audio files, they are quite large and stored in S3 (hopefully, if the
custom xcom backend is configured), and if it is already stored in S3, why not
do the exact same thing the backend does but manually? I think that the better
way of storing raw bytes is writing them to s3 not as an xcom and saving the s3
object key as the xcom (just like the backend does).
Maybe it is a good idea to implement such a feature, but perhaps a
discussion could clarify things a little, as I cannot speak for the majority, I
think that it could be a very nice feature but first, I would suggest asking
the community, and holding a discussion about this topic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Feature: Support bytes data values in xcom backend [airflow]
jolin1337 commented on code in PR #61777:
URL: https://github.com/apache/airflow/pull/61777#discussion_r2802672351
##
providers/common/io/src/airflow/providers/common/io/xcom/backend.py:
##
@@ -117,9 +117,13 @@ def serialize_value( # type: ignore[override]
run_id: str | None = None,
map_index: int | None = None,
) -> bytes | str:
-# We will use this serialized value to write to the object store.
-s_val = json.dumps(value, cls=XComEncoder)
-s_val_encoded = s_val.encode("utf-8")
+if isinstance(value, bytes):
+ # Store raw bytes as-is
+s_val_encoded = value
Review Comment:
Would you say this is the recommended and perfered way of storing raw bytes
data? to encode and decode on each task_run? Seams to me that this will add
unnecessary overhead but that is just my opinion. Does this require some sort
of discussion before implemented in to the common.io provider perhaps? In case
more people would like to see this feature it might be good idea to implement
or if it is only us then we could solve it as you suggests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Feature: Support bytes data values in xcom backend [airflow]
Nataneljpwd commented on code in PR #61777:
URL: https://github.com/apache/airflow/pull/61777#discussion_r2799073229
##
providers/common/io/src/airflow/providers/common/io/xcom/backend.py:
##
@@ -117,9 +117,13 @@ def serialize_value( # type: ignore[override]
run_id: str | None = None,
map_index: int | None = None,
) -> bytes | str:
-# We will use this serialized value to write to the object store.
-s_val = json.dumps(value, cls=XComEncoder)
-s_val_encoded = s_val.encode("utf-8")
+if isinstance(value, bytes):
+ # Store raw bytes as-is
+s_val_encoded = value
Review Comment:
Why not just hex/b64 encode the bytes? Why does it have to be bytes or json?
What is the use case for this?
Why not choose a different format to support both rather than json or bytes?
Raw bytes can always be encoded without adding much to the data size
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Feature: Support bytes data values in xcom backend [airflow]
jolin1337 commented on code in PR #61777: URL: https://github.com/apache/airflow/pull/61777#discussion_r2797745077 ## providers/common/io/src/airflow/providers/common/io/xcom/backend.py: ## @@ -171,8 +174,9 @@ def deserialize_value(result) -> Any: return data try: with path.open(mode="rb", compression="infer") as f: -return json.load(f, cls=XComDecoder) -except (FileNotFoundError, TypeError, ValueError): +data = f.read() +return json.loads(data, cls=XComDecoder) +except (FileNotFoundError, TypeError, ValueError, json.decoder.JSONDecodeError): Review Comment: Yes this is intentional -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Feature: Support bytes data values in xcom backend [airflow]
jolin1337 commented on PR #61777: URL: https://github.com/apache/airflow/pull/61777#issuecomment-3889737618 The reason behind this change is for processing binary data formats such as in memory images/video/audio. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Feature: Support bytes data values in xcom backend [airflow]
jolin1337 commented on code in PR #61777:
URL: https://github.com/apache/airflow/pull/61777#discussion_r2797743588
##
providers/common/io/src/airflow/providers/common/io/xcom/backend.py:
##
@@ -118,8 +118,11 @@ def serialize_value( # type: ignore[override]
map_index: int | None = None,
) -> bytes | str:
# We will use this serialized value to write to the object store.
-s_val = json.dumps(value, cls=XComEncoder)
-s_val_encoded = s_val.encode("utf-8")
+if not isinstance(value, bytes):
+s_val = json.dumps(value, cls=XComEncoder)
+s_val_encoded = s_val.encode("utf-8")
+else:
+s_val_encoded = value
Review Comment:
I agree
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Feature: Support bytes data values in xcom backend [airflow]
SameerMesiah97 commented on code in PR #61777:
URL: https://github.com/apache/airflow/pull/61777#discussion_r2795094967
##
providers/common/io/src/airflow/providers/common/io/xcom/backend.py:
##
@@ -171,8 +174,9 @@ def deserialize_value(result) -> Any:
return data
try:
with path.open(mode="rb", compression="infer") as f:
-return json.load(f, cls=XComDecoder)
-except (FileNotFoundError, TypeError, ValueError):
+data = f.read()
+return json.loads(data, cls=XComDecoder)
+except (FileNotFoundError, TypeError, ValueError,
json.decoder.JSONDecodeError):
Review Comment:
What about binaries that are valid inputs for `json.loads`? For example,
`b'"Hello"'`. It seems these will not be stored as bytes. If that is indeed
the case, I would suggest adding a comment like below:
`# Prefer JSON decoding; fall back to raw bytes if it fails.`
##
providers/common/io/src/airflow/providers/common/io/xcom/backend.py:
##
@@ -118,8 +118,11 @@ def serialize_value( # type: ignore[override]
map_index: int | None = None,
) -> bytes | str:
# We will use this serialized value to write to the object store.
-s_val = json.dumps(value, cls=XComEncoder)
-s_val_encoded = s_val.encode("utf-8")
+if not isinstance(value, bytes):
+s_val = json.dumps(value, cls=XComEncoder)
+s_val_encoded = s_val.encode("utf-8")
+else:
+s_val_encoded = value
Review Comment:
This is more of a stylistic nit that you donโt have to implement but since
raw bytes seems to be a special case, perhaps you could do:
```
if isinstance(value, bytes):
# Store raw bytes as-is
s_val_encoded = value
else:
# Default JSON serialization path
s_val = json.dumps(value, cls=XComEncoder)
s_val_encoded = s_val.encode("utf-8")
```
This is more explicit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Feature: Support bytes data values in xcom backend [airflow]
boring-cyborg[bot] commented on PR #61777: URL: https://github.com/apache/airflow/pull/61777#issuecomment-3885070517 Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst) Here are some useful points: - Pay attention to the quality of your code (ruff, mypy and type annotations). Our [prek-hooks]( https://github.com/apache/airflow/blob/main/contributing-docs/08_static_code_checks.rst#prerequisites-for-prek-hooks) will help you with that. - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/airflow-core/docs/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it. - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/dev/breeze/doc/README.rst) for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations. - Be patient and persistent. It might take some time to get a review or get the final approval from Committers. - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack. - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#coding-style-and-best-practices). - Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits. Apache Airflow is a community-driven project and together we are making it better ๐. In case of doubts contact the developers at: Mailing List: [email protected] Slack: https://s.apache.org/airflow-slack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
