Python 3.7.5
Beam 2.17

I've used both WriteToBigquery and BigQueryBatchFileLoads successfully
using DirectRunner.  I've boiled the issue down to a small
reproducible case (attached).  The following works great:

$ pipenv run python repro-direct.py \
  --project=CHANGEME \
  --runner=DirectRunner \
  --temp_location=gs://beam-to-bq--tmp/bshi/tmp \
  --staging_location=gs://beam-to-bq--tmp/bshi/stg \
  --experiment=beam_fn_api \   # <<<< NB! (I assume this is a no-op w/ Direct
  --save_main_function

$ bq --project=CHANGEME query 'select * from bshi_test.direct'
Waiting on bqjob_r53488a6b9ed6cb20_0000016fa05fb570_1 ... (0s) Current
status: DONE
+----------+----------+
| some_str | some_int |
+----------+----------+
| hello    |        1 |
| world    |        2 |
+----------+----------+

When changing to --runner=DataflowRunner, I get a cryptic Java error
with no other useful Python feedback (see class_cast_exception.txt)

apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException:
Dataflow pipeline failed. State: FAILED, Error:
java.util.concurrent.ExecutionException: java.lang.ClassCastException:
[B cannot be cast to org.apache.beam.sdk.values.KV

On a hunch, I removed --experiment=beam_fn_api and DataflowRunner
works fine.  Am I doing something against the intent of the SDK?

Happy to share access to the GCP project if that would assist in debugging.
INFO:root:Job 2020-01-13_11_49_07-14827785953089659312 is in state 
JOB_STATE_FAILED
Traceback (most recent call last):
  File "repro.py", line 35, in <module>
    main()
  File "repro.py", line 27, in main
    {"name": "some_int", "type": "INTEGER"},
  File 
"/Users/bo/.local/share/virtualenvs/bsmining-_shFK90Q/lib/python3.7/site-packages/apache_beam/pipeline.py",
 line 426, in __exit__
    self.run().wait_until_finish()
  File 
"/Users/bo/.local/share/virtualenvs/bsmining-_shFK90Q/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
 line 1377, in wait_until_finish
    (self.state, getattr(self._runner, 'last_error_msg', None)), self)
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow 
pipeline failed. State: FAILED, Error:
java.util.concurrent.ExecutionException: java.lang.ClassCastException: [B 
cannot be cast to org.apache.beam.sdk.values.KV
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at 
org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient.awaitCompletion(CompletableFutureInboundDataClient.java:48)
        at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:87)
        at 
org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService$DeferredInboundDataClient.awaitCompletion(BeamFnDataGrpcService.java:134)
        at 
org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortReadOperation.finish(RemoteGrpcPortReadOperation.java:83)
        at 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
        at 
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125)
        at 
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:411)
        at 
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:380)
        at 
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:305)
        at 
org.apache.beam.runners.dataflow.worker.DataflowRunnerHarness.start(DataflowRunnerHarness.java:195)
        at 
org.apache.beam.runners.dataflow.worker.DataflowRunnerHarness.main(DataflowRunnerHarness.java:123)
        Suppressed: java.lang.IllegalStateException: Already closed.
                at 
org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver.close(BeamFnDataBufferingOutboundObserver.java:93)
                at 
org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.abort(RemoteGrpcPortWriteOperation.java:220)
                at 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:91)
                ... 6 more
Caused by: java.lang.ClassCastException: [B cannot be cast to 
org.apache.beam.sdk.values.KV
        at 
org.apache.beam.runners.dataflow.worker.ReifyTimestampAndWindowsParDoFnFactory$ReifyTimestampAndWindowsParDoFn.processElement(ReifyTimestampAndWindowsParDoFnFactory.java:72)
        at 
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
        at 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
        at 
org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortReadOperation.consumeOutput(RemoteGrpcPortReadOperation.java:103)
        at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:78)
        at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:31)
        at 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:138)
        at 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)
        at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:249)
        at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
        at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
        at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:297)
        at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:738)
        at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

import logging
import sys

import apache_beam
import apache_beam.io.gcp.bigquery_file_loads


def main():
    logging.getLogger().setLevel(logging.DEBUG)
    options = apache_beam.options.pipeline_options.PipelineOptions(flags=sys.argv)
    with apache_beam.Pipeline(options=options) as p:
        (
            p
            | apache_beam.Create(
                [
                    {"some_str": "hello", "some_int": 1,},
                    {"some_str": "world", "some_int": 2,},
                ]
            )
            | apache_beam.io.gcp.bigquery_file_loads.BigQueryBatchFileLoads(
                options.view_as(apache_beam.options.pipeline_options.GoogleCloudOptions).project + ":bshi_test.direct",
                create_disposition=apache_beam.io.gcp.bigquery.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=apache_beam.io.gcp.bigquery.BigQueryDisposition.WRITE_TRUNCATE,
                schema={
                    "fields": [
                        {"name": "some_str", "type": "STRING"},
                        {"name": "some_int", "type": "INTEGER"},
                    ]
                },
            )
        )


if __name__ == "__main__":
    main()

Reply via email to