faisalhasnain opened a new issue, #27856:
URL: https://github.com/apache/beam/issues/27856

   ### What happened?
   
   I was reading from PubSub and writing to BigQuery after transforming 
messages, here is my Clojure code:
   ```
   (-> pipeline
       (th/apply! table
                  (-> (PubsubIO/readMessages)
                      (.fromSubscription (format "projects/%s/subscriptions/%s" 
project table-sub))
                      (th/with-name "reading pubsub stream"))
                  (-> (Window/into (FixedWindows/of (Duration/standardMinutes 
1)))
                      (th/with-name "divide into windows"))
                  (th/partial "convert pubsub messages to bigquery cdc row 
mutations" #'pubsub-msg->row-mutation table fields)
                  (-> (BigQueryIO/applyRowMutations)
                      (.to (format "%s:%s.%s" project dataset table))
                      (.withCreateDisposition 
BigQueryIO$Write$CreateDisposition/CREATE_NEVER)
                      (.withWriteDisposition 
BigQueryIO$Write$WriteDisposition/WRITE_APPEND)
                      (.withMethod 
BigQueryIO$Write$Method/STORAGE_API_AT_LEAST_ONCE)
                      (.optimizedWrites)
                      (th/with-name "write to bigquery")))
       (.getFailedStorageApiInserts)
       (th/apply! table (-> #'log-bq-errors
                            (th/with-name "log bigquery errors"))))
   ```
   but i was getting following error randomly:
   ```
   ERROR 2023-08-04T15:06:18.770Z: java.lang.RuntimeException: Execution 
failure from: 
TFn{#'tracking.postgres-to-bigquery-replication.core/pubsub-msg->row-mutation}
           thurber.java.TDoFn.execute(TDoFn.java:71)
           thurber.java.TDoFn.processElement(TDoFn.java:26)
   Caused by: org.apache.beam.sdk.util.UserCodeException: 
java.lang.IllegalArgumentException: Cannot output with timestamp 
2023-08-03T20:06:31.815Z. Output timestamps must be no earlier than the 
timestamp of the current input or timer (2023-08-03T20:06:32.236Z) minus the 
allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. See 
the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed 
skew.
           
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
           
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
           
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
           
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
           
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:76)
           
org.apache.beam.sdk.io.gcp.bigquery.StorageApiConvertMessages$ConvertMessagesDoFn.processElement(StorageApiConvertMessages.java:161)
           
org.apache.beam.sdk.io.gcp.bigquery.StorageApiConvertMessages$ConvertMessagesDoFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
           
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
           
org.apache.beam.runners.dataflow.worker.AssignWindowsParDoFnFactory$AssignWindowsParDoFn.processElement(AssignWindowsParDoFnFactory.java:115)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
           
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:411)
           
org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement(PrepareWrite.java:84)
           
org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:185)
           
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
           
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:411)
           thurber.java.TDoFn.execute(TDoFn.java:66)
           thurber.java.TDoFn.processElement(TDoFn.java:26)
           thurber.java.TDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:185)
           
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
           
org.apache.beam.runners.dataflow.worker.AssignWindowsParDoFnFactory$AssignWindowsParDoFn.processElement(AssignWindowsParDoFnFactory.java:115)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
           
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
           
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:76)
           
org.apache.beam.sdk.transforms.MapElements$2.processElement(MapElements.java:151)
           
org.apache.beam.sdk.transforms.MapElements$2$DoFnInvoker.invokeProcessElement(Unknown
 Source)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
           
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
           
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
           
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1404)
           
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:154)
           
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$4.run(StreamingDataflowWorker.java:1044)
           
org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:133)
           
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
           
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
           java.base/java.lang.Thread.run(Thread.java:833)
   Caused by: java.lang.IllegalArgumentException: Cannot output with timestamp 
2023-08-03T20:06:31.815Z. Output timestamps must be no earlier than the 
timestamp of the current input or timer (2023-08-03T20:06:32.236Z) minus the 
allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. See 
the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed 
skew.
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.checkTimestamp(SimpleDoFnRunner.java:259)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$1300(SimpleDoFnRunner.java:85)
           
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:429)
           
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:85)
           
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn$DestinationState.lambda$flush$8(StorageApiWriteUnshardedRecords.java:624)
           
org.apache.beam.sdk.io.gcp.bigquery.RetryManager.await(RetryManager.java:263)
           
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.flushAll(StorageApiWriteUnshardedRecords.java:856)
           
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.flushIfNecessary(StorageApiWriteUnshardedRecords.java:830)
           
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.process(StorageApiWriteUnshardedRecords.java:953)
   ```
   any ideas how to resolve this? it seems like bigquery storage is not 
handling timestamps correctly
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [X] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [X] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [X] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to