cfowler1woolworths opened a new issue, #37289:
URL: https://github.com/apache/beam/issues/37289

   ### What happened?
   
   <details>
     <summary>Version details</summary>  
   
   Python 3.13.9   
   Java OpenJDK Runtime Environment Temurin-17.0.17+10 (build 17.0.17+10)   
   Beam 2.70.0   
   
   
   ```
   poetry run pip show apache-beam
   Name: apache-beam
   Version: 2.70.0
   Summary: Apache Beam SDK for Python
   Home-page: https://beam.apache.org
   Author: Apache Software Foundation
   Author-email: [email protected]
   License: Apache License, Version 2.0
   Location: 
C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages
   Requires: beartype, cryptography, fastavro, fasteners, grpcio, httplib2, 
jsonpickle, numpy, objsize, packaging, proto-plus, protobuf, pyarrow, 
pyarrow-hotfix, pymongo, python-dateutil, pytz, pyyaml, requests, 
sortedcontainers, typing-extensions, zstandard
   Required-by: 
   ```
   
   
   </details>
   
   <details>
     <summary>Traceback</summary>
   ```
   Traceback (most recent call last):
     File "<frozen runpy>", line 198, in _run_module_as_main
     File "<frozen runpy>", line 88, in _run_code
     File 
"C:\Users\1375986\Documents\Projects\rerankers\cf-realtime-reranking\click_aggregator_service\click_aggregator\pipeline.py",
 line 234, in <module>
       run_pipeline(
       ~~~~~~~~~~~~^
           subscription="projects/your-project/subscriptions/your-subscription",
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
       ...<5 lines>...
           ),
           ^^
       )
       ^
     File 
"C:\Users\1375986\Documents\Projects\rerankers\cf-realtime-reranking\click_aggregator_service\click_aggregator\pipeline.py",
 line 229, in run_pipeline
       _ = all_errors | "Write to Dead Letter" >> 
ErrorMsgSink.create_sink(dead_letter_table=dead_letter_table)
           
~~~~~~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pvalue.py",
 line 139, in __or__
       return self.pipeline.apply(ptransform, self)
              ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py",
 line 702, in apply
       return self.apply(
              ~~~~~~~~~~^
           transform.transform, pvalueish, label or transform.label)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py",
 line 717, in apply
       return self.apply(transform, pvalueish)
              ~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py",
 line 797, in apply
       pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py",
 line 191, in apply
       return self.apply_PTransform(transform, input, options)
              ~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py",
 line 195, in apply_PTransform
       return transform.expand(input)
              ~~~~~~~~~~~~~~~~^^^^^^^
     File 
"C:\Users\1375986\Documents\Projects\rerankers\cf-realtime-reranking\click_aggregator_service\click_aggregator\error_msg_sink.py",
 line 69, in expand
       error_msgs
       ~~~~~~~~~~
       | "Convert to Dict" >> beam.Map(lambda msg: msg.model_dump())
       ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
       | "Write Errors to BigQuery"
       ^~~~~~~~~~~~~~~~~~~~~~~~~~~~
       >> WriteToBigQuery(
       ~~~~~~~~~~~~~~~~~~~
       ...<6 lines>...
       )
       ~
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pvalue.py",
 line 139, in __or__
       return self.pipeline.apply(ptransform, self)
              ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py",
 line 702, in apply
       return self.apply(
              ~~~~~~~~~~^
           transform.transform, pvalueish, label or transform.label)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py",
 line 717, in apply
       return self.apply(transform, pvalueish)
              ~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py",
 line 797, in apply
       pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py",
 line 191, in apply
       return self.apply_PTransform(transform, input, options)
              ~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py",
 line 195, in apply_PTransform
       return transform.expand(input)
              ~~~~~~~~~~~~~~~~^^^^^^^
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\io\gcp\bigquery.py",
 line 2384, in expand
       return pcoll | StorageWriteToBigQuery(
              ~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~
           table=self.table_reference,
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~
       ...<12 lines>...
           expansion_service=self.expansion_service)
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pvalue.py",
 line 139, in __or__
       return self.pipeline.apply(ptransform, self)
              ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py",
 line 797, in apply
       pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py",
 line 191, in apply
       return self.apply_PTransform(transform, input, options)
              ~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py",
 line 195, in apply_PTransform
       return transform.expand(input)
              ~~~~~~~~~~~~~~~~^^^^^^^
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\io\gcp\bigquery.py",
 line 2740, in expand
       input_beam_rows
       ~~~~~~~~~~~~~~~
       | SchemaAwareExternalTransform(
       ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
           identifier=StorageWriteToBigQuery.IDENTIFIER,
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
       ...<15 lines>...
           }))
           ~~
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pvalue.py",
 line 139, in __or__
       return self.pipeline.apply(ptransform, self)
              ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py",
 line 797, in apply
       pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py",
 line 191, in apply
       return self.apply_PTransform(transform, input, options)
              ~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py",
 line 195, in apply_PTransform
       return transform.expand(input)
              ~~~~~~~~~~~~~~~~^^^^^^^
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\transforms\external.py",
 line 509, in expand
       return pcolls | self._payload_builder.identifier() >> ExternalTransform(
              ~~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
           common_urns.schematransform_based_expand.urn,
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
           payload_builder,
           ~~~~~~~~~~~~~~~~
           expansion_service)
           ~~~~~~~~~~~~~~~~~~
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pvalue.py",
 line 139, in __or__
       return self.pipeline.apply(ptransform, self)
              ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py",
 line 702, in apply
       return self.apply(
              ~~~~~~~~~~^
           transform.transform, pvalueish, label or transform.label)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py",
 line 717, in apply
       return self.apply(transform, pvalueish)
              ~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py",
 line 797, in apply
       pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py",
 line 191, in apply
       return self.apply_PTransform(transform, input, options)
              ~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py",
 line 195, in apply_PTransform
       return transform.expand(input)
              ~~~~~~~~~~~~~~~~^^^^^^^
     File 
"C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\transforms\external.py",
 line 808, in expand
       if self._type_hints.output_types:
          ^^^^^^^^^^^^^^^^
   AttributeError: 'ExternalTransform' object has no attribute '_type_hints'. 
Did you mean: 'get_type_hints'?
   ```
   </details>
   
   
   <details>
   <summary>Snippets</summary>  
   
   ```
   poetry run python -m click_aggregator_service.click_aggregator.pipeline \
        --runner DirectRunner \
        --streaming \
        --no_pipeline_type_check
   ```
   
   ```
   if self.dead_letter_table:
               bq_schema: dict[str, Any] = {
                   "fields": [
                       {"name": "job_name", "type": "STRING", "mode": 
"NULLABLE"},
                       {"name": "transform", "type": "STRING", "mode": 
"NULLABLE"},
                       {"name": "error", "type": "STRING", "mode": "NULLABLE"},
                       {"name": "data", "type": "STRING", "mode": "NULLABLE"},
                       {"name": "timestamp", "type": "TIMESTAMP", "mode": 
"NULLABLE"},
                       {"name": "id", "type": "STRING", "mode": "NULLABLE"},
                   ]
               }
   
               # Write to BigQuery if table is provided
               _ = (
                   error_msgs
                   | "Convert to Dict" >> beam.Map(lambda msg: msg.model_dump())
                   | "Write Errors to BigQuery"
                   >> WriteToBigQuery(
                       table=self.dead_letter_table,
                       write_disposition=BigQueryDisposition.WRITE_APPEND,
                       create_disposition=BigQueryDisposition.CREATE_NEVER,
                       method=WriteToBigQuery.Method.STORAGE_WRITE_API,
                       use_at_least_once=True,
                       schema=cast(Any, bq_schema),
                   )
               )
   ```
   </details>
   
   Attempting to use `apache_beam.io.gcp.bigquery.WriteToBigQuery` fails and 
raises the above exception.
   Bypassing the failing check still causes pipeline failure, but on the Java 
side.
   Also checked Beam v2.69.0, still failed.
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [x] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to