mouyang opened a new issue, #24870:
URL: https://github.com/apache/beam/issues/24870

   ### What happened?
   
   I tried to run a pipeline that ran a SQL query with a BQ DATE field in the 
result set with DataflowRunner and it resulted in an encoding error.  The 
exception message suggests that the correct coder was chosen (The Date logical 
type resolves to a Long) but the value resolution doesn't appear to happen with 
DataflowRunner.  It did with DirectRunner.
   
   
   Affected Runners: DataflowRunner (DirectRunner with 2.43.0 and previous 
versions was fine)
   Affected Versions: 2.43.0 (2.41.0, 2.42.0 do not appear to exhibit this 
behaviour.)
   
   Sample Code
   
   ```
   import com.google.api.services.bigquery.model.TableRow;
   import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
   import org.apache.beam.sdk.Pipeline;
   import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
   import org.apache.beam.sdk.options.PipelineOptionsFactory;
   import org.apache.beam.sdk.transforms.DoFn;
   import org.apache.beam.sdk.transforms.MapElements;
   import org.apache.beam.sdk.transforms.ParDo;
   import org.apache.beam.sdk.values.PCollection;
   import org.apache.beam.sdk.values.Row;
   import org.apache.beam.sdk.values.TypeDescriptors;
   
   import java.io.File;
   import java.util.Arrays;
   import java.util.stream.Collectors;
   
   public class TestPipeline {
       public static void main(String[] args) {
           PipelineOptionsFactory.register(DataflowPipelineOptions.class);
           DataflowPipelineOptions options = 
PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class);
           options.setFilesToStage(
                   Arrays.stream(System.getProperty("java.class.path").
                           split(File.pathSeparator)).
                           map(entry -> (new 
File(entry)).toString()).collect(Collectors.toList()));
   
   
           String testQuery = "select cast(\"2022-12-21\" as date) dt";
           Pipeline pipeline = Pipeline.create(options);
           PCollection<TableRow> tableRows = 
pipeline.apply(BigQueryIO.readTableRowsWithSchema()
                   .fromQuery(testQuery)
                   .usingStandardSql()
           );
           PCollection<Row> rows = 
tableRows.apply(MapElements.into(TypeDescriptors.rows())
                   .via(tableRows.getToRowFunction()))
                   .setRowSchema(tableRows.getSchema());
           rows.apply("println", ParDo.of(new DoFn<Row, Row>() {
               @ProcessElement
               public void processElement(@Element Row row) {
                   System.out.println("println row" + row);
               }
           })).setRowSchema(tableRows.getSchema());
           pipeline.run();
       }
   }
   ```
   
   Exception
   ```
   Error message from worker: java.lang.IllegalArgumentException: Unable to 
encode element 'GenericData{classInfo=[f], {dt=2022-12-21}}' with coder 
'SchemaCoder<Schema: Fields:
   Field{name=dt, description=, type=LOGICAL_TYPE, options={{}}}
   Encoding positions:
   {dt=0}
   Options:{{}}UUID: 9622aec6-90b4-4021-b112-a7e4d2abecef  UUID: 
9622aec6-90b4-4021-b112-a7e4d2abecef delegateCoder: 
org.apache.beam.sdk.coders.Coder$ByteBuddy$0cxD4zOH@3ef9d47a'.
        
org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
        
org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
        
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:642)
        
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558)
        
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:403)
        
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128)
        
org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67)
        
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
        
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
        
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
        
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
        
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
        
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:411)
        
org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup$IdentityFn.processElement(PassThroughThenCleanup.java:84)
   Caused by: java.lang.ClassCastException: java.time.LocalDate cannot be cast 
to java.lang.Long
        org.apache.beam.sdk.coders.VarLongCoder.encode(VarLongCoder.java:35)
        
org.apache.beam.sdk.schemas.SchemaCoderHelpers$LogicalTypeCoder.encode(SchemaCoderHelpers.java:89)
        
org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate(RowCoderGenerator.java:333)
        org.apache.beam.sdk.coders.Coder$ByteBuddy$0cxD4zOH.encode(Unknown 
Source)
        org.apache.beam.sdk.coders.Coder$ByteBuddy$0cxD4zOH.encode(Unknown 
Source)
        org.apache.beam.sdk.schemas.SchemaCoder.encode(SchemaCoder.java:124)
        
org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297)
        
org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
        
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:642)
        
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558)
        
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:403)
        
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128)
        
org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67)
        
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
        
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
        
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
        
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
        
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
        
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:411)
        
org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup$IdentityFn.processElement(PassThroughThenCleanup.java:84)
        
org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup$IdentityFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
        
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
        
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
        
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
        
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
        
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
        
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
        
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
        
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:420)
        
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:389)
        
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:314)
        
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
        
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
        
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
        java.util.concurrent.FutureTask.run(FutureTask.java:266)
        
org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        java.lang.Thread.run(Thread.java:748)
   ```
   
   
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [X] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to