cfowler1woolworths opened a new issue, #37289:
URL: https://github.com/apache/beam/issues/37289
### What happened?
<details>
<summary>Version details</summary>
Python 3.13.9
Java OpenJDK Runtime Environment Temurin-17.0.17+10 (build 17.0.17+10)
Beam 2.70.0
```
poetry run pip show apache-beam
Name: apache-beam
Version: 2.70.0
Summary: Apache Beam SDK for Python
Home-page: https://beam.apache.org
Author: Apache Software Foundation
Author-email: [email protected]
License: Apache License, Version 2.0
Location:
C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages
Requires: beartype, cryptography, fastavro, fasteners, grpcio, httplib2,
jsonpickle, numpy, objsize, packaging, proto-plus, protobuf, pyarrow,
pyarrow-hotfix, pymongo, python-dateutil, pytz, pyyaml, requests,
sortedcontainers, typing-extensions, zstandard
Required-by:
```
</details>
<details>
<summary>Traceback</summary>
```
Traceback (most recent call last):
File "<frozen runpy>", line 198, in _run_module_as_main
File "<frozen runpy>", line 88, in _run_code
File
"C:\Users\1375986\Documents\Projects\rerankers\cf-realtime-reranking\click_aggregator_service\click_aggregator\pipeline.py",
line 234, in <module>
run_pipeline(
~~~~~~~~~~~~^
subscription="projects/your-project/subscriptions/your-subscription",
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
...<5 lines>...
),
^^
)
^
File
"C:\Users\1375986\Documents\Projects\rerankers\cf-realtime-reranking\click_aggregator_service\click_aggregator\pipeline.py",
line 229, in run_pipeline
_ = all_errors | "Write to Dead Letter" >>
ErrorMsgSink.create_sink(dead_letter_table=dead_letter_table)
~~~~~~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pvalue.py",
line 139, in __or__
return self.pipeline.apply(ptransform, self)
~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py",
line 702, in apply
return self.apply(
~~~~~~~~~~^
transform.transform, pvalueish, label or transform.label)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py",
line 717, in apply
return self.apply(transform, pvalueish)
~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py",
line 797, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py",
line 191, in apply
return self.apply_PTransform(transform, input, options)
~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py",
line 195, in apply_PTransform
return transform.expand(input)
~~~~~~~~~~~~~~~~^^^^^^^
File
"C:\Users\1375986\Documents\Projects\rerankers\cf-realtime-reranking\click_aggregator_service\click_aggregator\error_msg_sink.py",
line 69, in expand
error_msgs
~~~~~~~~~~
| "Convert to Dict" >> beam.Map(lambda msg: msg.model_dump())
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
| "Write Errors to BigQuery"
^~~~~~~~~~~~~~~~~~~~~~~~~~~~
>> WriteToBigQuery(
~~~~~~~~~~~~~~~~~~~
...<6 lines>...
)
~
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pvalue.py",
line 139, in __or__
return self.pipeline.apply(ptransform, self)
~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py",
line 702, in apply
return self.apply(
~~~~~~~~~~^
transform.transform, pvalueish, label or transform.label)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py",
line 717, in apply
return self.apply(transform, pvalueish)
~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py",
line 797, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py",
line 191, in apply
return self.apply_PTransform(transform, input, options)
~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py",
line 195, in apply_PTransform
return transform.expand(input)
~~~~~~~~~~~~~~~~^^^^^^^
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\io\gcp\bigquery.py",
line 2384, in expand
return pcoll | StorageWriteToBigQuery(
~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~
table=self.table_reference,
~~~~~~~~~~~~~~~~~~~~~~~~~~~
...<12 lines>...
expansion_service=self.expansion_service)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pvalue.py",
line 139, in __or__
return self.pipeline.apply(ptransform, self)
~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py",
line 797, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py",
line 191, in apply
return self.apply_PTransform(transform, input, options)
~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py",
line 195, in apply_PTransform
return transform.expand(input)
~~~~~~~~~~~~~~~~^^^^^^^
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\io\gcp\bigquery.py",
line 2740, in expand
input_beam_rows
~~~~~~~~~~~~~~~
| SchemaAwareExternalTransform(
^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
identifier=StorageWriteToBigQuery.IDENTIFIER,
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
...<15 lines>...
}))
~~
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pvalue.py",
line 139, in __or__
return self.pipeline.apply(ptransform, self)
~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py",
line 797, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py",
line 191, in apply
return self.apply_PTransform(transform, input, options)
~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py",
line 195, in apply_PTransform
return transform.expand(input)
~~~~~~~~~~~~~~~~^^^^^^^
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\transforms\external.py",
line 509, in expand
return pcolls | self._payload_builder.identifier() >> ExternalTransform(
~~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
common_urns.schematransform_based_expand.urn,
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
payload_builder,
~~~~~~~~~~~~~~~~
expansion_service)
~~~~~~~~~~~~~~~~~~
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pvalue.py",
line 139, in __or__
return self.pipeline.apply(ptransform, self)
~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py",
line 702, in apply
return self.apply(
~~~~~~~~~~^
transform.transform, pvalueish, label or transform.label)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py",
line 717, in apply
return self.apply(transform, pvalueish)
~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py",
line 797, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py",
line 191, in apply
return self.apply_PTransform(transform, input, options)
~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py",
line 195, in apply_PTransform
return transform.expand(input)
~~~~~~~~~~~~~~~~^^^^^^^
File
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\transforms\external.py",
line 808, in expand
if self._type_hints.output_types:
^^^^^^^^^^^^^^^^
AttributeError: 'ExternalTransform' object has no attribute '_type_hints'.
Did you mean: 'get_type_hints'?
```
</details>
<details>
<summary>Snippets</summary>
```
poetry run python -m click_aggregator_service.click_aggregator.pipeline \
--runner DirectRunner \
--streaming \
--no_pipeline_type_check
```
```
if self.dead_letter_table:
bq_schema: dict[str, Any] = {
"fields": [
{"name": "job_name", "type": "STRING", "mode":
"NULLABLE"},
{"name": "transform", "type": "STRING", "mode":
"NULLABLE"},
{"name": "error", "type": "STRING", "mode": "NULLABLE"},
{"name": "data", "type": "STRING", "mode": "NULLABLE"},
{"name": "timestamp", "type": "TIMESTAMP", "mode":
"NULLABLE"},
{"name": "id", "type": "STRING", "mode": "NULLABLE"},
]
}
# Write to BigQuery if table is provided
_ = (
error_msgs
| "Convert to Dict" >> beam.Map(lambda msg: msg.model_dump())
| "Write Errors to BigQuery"
>> WriteToBigQuery(
table=self.dead_letter_table,
write_disposition=BigQueryDisposition.WRITE_APPEND,
create_disposition=BigQueryDisposition.CREATE_NEVER,
method=WriteToBigQuery.Method.STORAGE_WRITE_API,
use_at_least_once=True,
schema=cast(Any, bq_schema),
)
)
```
</details>
Attempting to use `apache_beam.io.gcp.bigquery.WriteToBigQuery` fails and
raises the above exception.
Bypassing the failing check still causes pipeline failure, but on the Java
side.
Also checked Beam v2.69.0, still failed.
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [x] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Infrastructure
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]