I am using the bigqueryio transform and I am using the following struct to 
collect a data row:

type Record {
  source_service  biquery.NullString
  .. etc...
}

This works fine with the direct runner, but when I try it with the dataflow 
runner, then I get the following exception:
 
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
received from SDK harness for instruction -41: execute failed: bigquery: schema 
field source_service of type STRING is not assignable to struct field 
source_service of type struct { StringVal string; Valid bool }
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:55)
        at 
com.google.cloud.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:274)
        at 
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
        at 
com.google.cloud.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:101)
        at 
com.google.cloud.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:391)
        at 
com.google.cloud.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:360)
        at 
com.google.cloud.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:288)
        at 
com.google.cloud.dataflow.worker.DataflowRunnerHarness.start(DataflowRunnerHarness.java:179)
        at 
com.google.cloud.dataflow.worker.DataflowRunnerHarness.main(DataflowRunnerHarness.java:107)
        Suppressed: java.lang.IllegalStateException: Already closed.
                at 
org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver.close(BeamFnDataBufferingOutboundObserver.java:97)
                at 
com.google.cloud.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.abort(RemoteGrpcPortWriteOperation.java:93)
                at 
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:89)
                ... 6 more

Looks like the bigquery API is failing to detect the nullable type NullString, 
and instead is attempting a plain assignment. Could it be that some aspect of 
the type information has been lost thus preventing the bigquery API from 
identifying and handling NullString properly?

Reply via email to