mouyang opened a new issue, #24870:
URL: https://github.com/apache/beam/issues/24870
### What happened?
I tried to run a pipeline that ran a SQL query with a BQ DATE field in the
result set with DataflowRunner and it resulted in an encoding error. The
exception message suggests that the correct coder was chosen (The Date logical
type resolves to a Long) but the value resolution doesn't appear to happen with
DataflowRunner. It did with DirectRunner.
Affected Runners: DataflowRunner (DirectRunner with 2.43.0 and previous
versions was fine)
Affected Versions: 2.43.0 (2.41.0, 2.42.0 do not appear to exhibit this
behaviour.)
Sample Code
```
import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import java.io.File;
import java.util.Arrays;
import java.util.stream.Collectors;
public class TestPipeline {
public static void main(String[] args) {
PipelineOptionsFactory.register(DataflowPipelineOptions.class);
DataflowPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class);
options.setFilesToStage(
Arrays.stream(System.getProperty("java.class.path").
split(File.pathSeparator)).
map(entry -> (new
File(entry)).toString()).collect(Collectors.toList()));
String testQuery = "select cast(\"2022-12-21\" as date) dt";
Pipeline pipeline = Pipeline.create(options);
PCollection<TableRow> tableRows =
pipeline.apply(BigQueryIO.readTableRowsWithSchema()
.fromQuery(testQuery)
.usingStandardSql()
);
PCollection<Row> rows =
tableRows.apply(MapElements.into(TypeDescriptors.rows())
.via(tableRows.getToRowFunction()))
.setRowSchema(tableRows.getSchema());
rows.apply("println", ParDo.of(new DoFn<Row, Row>() {
@ProcessElement
public void processElement(@Element Row row) {
System.out.println("println row" + row);
}
})).setRowSchema(tableRows.getSchema());
pipeline.run();
}
}
```
Exception
```
Error message from worker: java.lang.IllegalArgumentException: Unable to
encode element 'GenericData{classInfo=[f], {dt=2022-12-21}}' with coder
'SchemaCoder<Schema: Fields:
Field{name=dt, description=, type=LOGICAL_TYPE, options={{}}}
Encoding positions:
{dt=0}
Options:{{}}UUID: 9622aec6-90b4-4021-b112-a7e4d2abecef UUID:
9622aec6-90b4-4021-b112-a7e4d2abecef delegateCoder:
org.apache.beam.sdk.coders.Coder$ByteBuddy$0cxD4zOH@3ef9d47a'.
org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:642)
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558)
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:403)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128)
org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:411)
org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup$IdentityFn.processElement(PassThroughThenCleanup.java:84)
Caused by: java.lang.ClassCastException: java.time.LocalDate cannot be cast
to java.lang.Long
org.apache.beam.sdk.coders.VarLongCoder.encode(VarLongCoder.java:35)
org.apache.beam.sdk.schemas.SchemaCoderHelpers$LogicalTypeCoder.encode(SchemaCoderHelpers.java:89)
org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate(RowCoderGenerator.java:333)
org.apache.beam.sdk.coders.Coder$ByteBuddy$0cxD4zOH.encode(Unknown
Source)
org.apache.beam.sdk.coders.Coder$ByteBuddy$0cxD4zOH.encode(Unknown
Source)
org.apache.beam.sdk.schemas.SchemaCoder.encode(SchemaCoder.java:124)
org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297)
org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:642)
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558)
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:403)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128)
org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:411)
org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup$IdentityFn.processElement(PassThroughThenCleanup.java:84)
org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup$IdentityFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:420)
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:389)
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:314)
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
```
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [ ] Component: Python SDK
- [X] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]