damccorm opened a new issue, #19998:
URL: https://github.com/apache/beam/issues/19998

   ```
   
   python repro.py \
     --project=CHANGEME \
     --runner=DataflowRunner \
     --temp_location=gs://change-me/bshi/tmp
   \
     --staging_location=gs://change-me/bshi/stg \
     --experiment=beam_fn_api
     --save_main_function
   
   ```
   
   The same repro code works with \--runner=Direct.  On Dataflow, the error is
   ```
   
   java.util.concurrent.ExecutionException: java.lang.ClassCastException: [B 
cannot be cast to org.apache.beam.sdk.values.KV
        at
   java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at
   
org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient.awaitCompletion(CompletableFutureInboundDataClient.java:48)
        at
   
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:87)
        at
   
org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService$DeferredInboundDataClient.awaitCompletion(BeamFnDataGrpcService.java:134)
        at
   
org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortReadOperation.finish(RemoteGrpcPortReadOperation.java:83)
        at
   
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
        at
   
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125)
        at
   
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:411)
        at
   
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:380)
        at
   
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:305)
        at
   
org.apache.beam.runners.dataflow.worker.DataflowRunnerHarness.start(DataflowRunnerHarness.java:195)
        at
   
org.apache.beam.runners.dataflow.worker.DataflowRunnerHarness.main(DataflowRunnerHarness.java:123)
        Suppressed:
   java.lang.IllegalStateException: Already closed.
                at 
org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver.close(BeamFnDataBufferingOutboundObserver.java:93)
                at
   
org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.abort(RemoteGrpcPortWriteOperation.java:220)
                at
   
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:91)
                ...
   6 more
   Caused by: java.lang.ClassCastException: [B cannot be cast to 
org.apache.beam.sdk.values.KV
        at
   
org.apache.beam.runners.dataflow.worker.ReifyTimestampAndWindowsParDoFnFactory$ReifyTimestampAndWindowsParDoFn.processElement(ReifyTimestampAndWindowsParDoFnFactory.java:72)
        at
   
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
        at
   
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
        at
   
org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortReadOperation.consumeOutput(RemoteGrpcPortReadOperation.java:103)
        at
   
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:78)
        at
   
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:31)
        at
   
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:138)
        at
   
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)
        at
   
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:249)
        at
   
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
        at
   
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
        at
   
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:297)
        at
   
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:738)
        at
   
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at
   
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at
   
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at
   java.lang.Thread.run(Thread.java:748)
   
   ```
   
    
   ```
   
   import logging
   import sys
   
   import apache_beam
   import apache_beam.io.gcp.bigquery_file_loads
   
   
   def
   main():
       logging.getLogger().setLevel(logging.DEBUG)
       options = 
apache_beam.options.pipeline_options.PipelineOptions(flags=sys.argv)
   
      with apache_beam.Pipeline(options=options) as p:
           (
               p
               | apache_beam.Create(
   
                  [
                       {"some_str": "hello", "some_int": 1,},
                       {"some_str":
   "world", "some_int": 2,},
                   ]
               )
               | apache_beam.io.gcp.bigquery_file_loads.BigQueryBatchFileLoads(
   
                  options.view_as(
                       apache_beam.options.pipeline_options.GoogleCloudOptions
   
                  ).project
                   + ":bo_test.flow",
                   
create_disposition=apache_beam.io.gcp.bigquery.BigQueryDisposition.CREATE_IF_NEEDED,
   
                  
write_disposition=apache_beam.io.gcp.bigquery.BigQueryDisposition.WRITE_TRUNCATE,
    
                 schema={
                       "fields": [
                           {"name": "some_str",
   "type": "STRING"},
                           {"name": "some_int", "type": "INTEGER"},
                
         ]
                   },
               )
           )
   
   
   if __name__ == "__main__":
       main()
   
   ```
   
   
   Imported from Jira 
[BEAM-9192](https://issues.apache.org/jira/browse/BEAM-9192). Original Jira may 
contain additional context.
   Reported by: bshi.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to