Re: Does withkeys transform enforce a reshuffle?
Also I looked at the code, reshuffle seems doing some groupby work internally. But I don't really need groupby On Fri, Jan 19, 2024 at 9:35 AM hsy...@gmail.com wrote: > ReShuffle is deprecated > > On Fri, Jan 19, 2024 at 8:25 AM XQ Hu via user > wrote: > >> I do not think it enforces a reshuffle by just checking the doc here: >> https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html?highlight=withkeys#apache_beam.transforms.util.WithKeys >> >> Have you tried to just add ReShuffle after PubsubLiteIO? >> >> On Thu, Jan 18, 2024 at 8:54 PM hsy...@gmail.com >> wrote: >> >>> Hey guys, >>> >>> I have a question, does withkeys transformation enforce a reshuffle? >>> >>> My pipeline basically look like this PubsubLiteIO -> ParDo(..) -> >>> ParDo() -> BigqueryIO.write() >>> >>> The problem is PubsubLiteIO -> ParDo(..) -> ParDo() always fused >>> together. But The ParDo is expensive and I want dataflow to have more >>> workers to work on that, what's the best way to do that? >>> >>> Regards, >>> >>>
Re: Does withkeys transform enforce a reshuffle?
ReShuffle is deprecated On Fri, Jan 19, 2024 at 8:25 AM XQ Hu via user wrote: > I do not think it enforces a reshuffle by just checking the doc here: > https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html?highlight=withkeys#apache_beam.transforms.util.WithKeys > > Have you tried to just add ReShuffle after PubsubLiteIO? > > On Thu, Jan 18, 2024 at 8:54 PM hsy...@gmail.com wrote: > >> Hey guys, >> >> I have a question, does withkeys transformation enforce a reshuffle? >> >> My pipeline basically look like this PubsubLiteIO -> ParDo(..) -> ParDo() >> -> BigqueryIO.write() >> >> The problem is PubsubLiteIO -> ParDo(..) -> ParDo() always fused >> together. But The ParDo is expensive and I want dataflow to have more >> workers to work on that, what's the best way to do that? >> >> Regards, >> >>
Does withkeys transform enforce a reshuffle?
Hey guys, I have a question, does withkeys transformation enforce a reshuffle? My pipeline basically look like this PubsubLiteIO -> ParDo(..) -> ParDo() -> BigqueryIO.write() The problem is PubsubLiteIO -> ParDo(..) -> ParDo() always fused together. But The ParDo is expensive and I want dataflow to have more workers to work on that, what's the best way to do that? Regards,
ParDo(DoFn) with multiple context.output vs FlatMapElements
Hello I have a question. If I have a transform for each input it will emit 1 or many output (same collection) I can do it with ParDo + DoFun while in processElement method for each input, call context.output multiply times vs doing it with FlatMapElements, is there any difference? Does the dataflow fuse the downstream transform automatically? Eventually I want more downstream transform workers cause it needs to handle more data, How do I supposed to do that? Regards, Siyuan
pubsubliteio ack problem
In my application, the pubsubliteio seems never ack the message and the data lateness is building up forever, my question is how does dataflow know when to ack the message, How does the engine even know when it is processed?
How to set flow control for pubsubliteio?
How to change flow control config for pubsubliteio ? I saw the setting has been taken out as part of https://issues.apache.org/jira/browse/BEAM-14129 But without setup flow control correctly, my beam app is running super slow ingesting from pubsbulite and getting NO_CLIENT_TOKEN error on the server side, which suggest to increase the flow control setting
Re: pubsubliteio is super slow
Do you have a ticket? And in the pubsublite metrics it show NO_CLIENT_TOKENS On Tue, Dec 19, 2023 at 1:39 PM Nirav Patel wrote: > we have. yes it is super slow. I tested python, java IO version as well > besides beam IO. we reported to google about this problem. > > On Tue, Dec 19, 2023 at 10:17 AM hsy...@gmail.com > wrote: > >> Any one is using pubsublite? I find it super slow 5 messages/sec and >> there is no options for me to tune the performance >> >
pubsubliteio is super slow
Any one is using pubsublite? I find it super slow 5 messages/sec and there is no options for me to tune the performance
Re: Questions about writing to BigQuery using storage api
Caused by is in the error message On Thu, Dec 7, 2023 at 10:47 AM Reuven Lax via user wrote: > This is the stack trace of the rethrown exception. The log should also > contain a "caused by" log somewhere detailing the original exception. Do > you happen to have that? > > On Thu, Dec 7, 2023 at 8:46 AM hsy...@gmail.com wrote: > >> Here is the complete stacktrace It doesn't even hit my code and it >> happens consistently! >> >> Error message from worker: java.lang.RuntimeException: >> java.lang.IllegalStateException >> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:573) >> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790) >> Caused by: java.lang.IllegalStateException >> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:496) >> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.pin(BigQueryServicesImpl.java:1403) >> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:565) >> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790) >> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeProcessElement(Unknown >> Source) >> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211) >> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:185) >> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340) >> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) >> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54) >> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285) >> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275) >> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85) >> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:430) >> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:85) >> org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.flushBatch(GroupIntoBatches.java:660) >> org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.processElement(GroupIntoBatches.java:518) >> org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn$DoFnInvoker.invokeProcessElement(Unknown >> Source) >> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211) >> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:185) >> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340) >> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) >> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54) >> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218) >> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169) >> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83) >> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1433) >> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:155) >> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$4.run(StreamingDataflowWorker.java:1056) >> org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:163) >> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) >> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >&g
Re: Questions about writing to BigQuery using storage api
Here is the complete stacktrace It doesn't even hit my code and it happens consistently! Error message from worker: java.lang.RuntimeException: java.lang.IllegalStateException org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:573) org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790) Caused by: java.lang.IllegalStateException org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:496) org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.pin(BigQueryServicesImpl.java:1403) org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:565) org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790) org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:185) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54) org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:430) org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:85) org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.flushBatch(GroupIntoBatches.java:660) org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.processElement(GroupIntoBatches.java:518) org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:185) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1433) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:155) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$4.run(StreamingDataflowWorker.java:1056) org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:163) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:834) Regards, Siyuan On Wed, Dec 6, 2023 at 10:12 AM Ahmed Abualsaud wrote: > Hey, can you provide the full stack trace for the error you're seeing? > Also is this happening consistently? > > *+1* to raising a Google ticket where we'll have more visibility. > > On Wed, Dec 6, 2023 at 11:33 AM John Casey > wrote: > >> Hmm. It may be best if you raise a ticket with Google support for this. I >> can inspect your job directly if you do that, and that will make this more >> straightforward. >> >> On Wed, Dec 6, 2023 at 11:24 AM hsy...@gmail.com >> wrote: >> >>> I’m just using dataflow engine >>> On Wed, Dec 6, 202
Re: Questions about writing to BigQuery using storage api
I’m just using dataflow engine On Wed, Dec 6, 2023 at 08:23 John Casey via user wrote: > Well, that is odd. It looks like the underlying client is closed, which is > unexpected. > > Do you see any retries in your pipeline? Also, what runner are you using? > > @Ahmed Abualsaud this might be interesting to > you too > > On Tue, Dec 5, 2023 at 9:39 PM hsy...@gmail.com wrote: > >> I'm using version 2.51.0 and The configuration is like this >> >> write >> .withoutValidation() >> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) >> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) >> .withExtendedErrorInfo() >> .withMethod(Write.Method.STORAGE_WRITE_API) >> .withTriggeringFrequency(Duration.standardSeconds(10)) >> .withAutoSharding().optimizedWrites() >> .withFailedInsertRetryPolicy(retryTransientErrors()); >> >> >> On Tue, Dec 5, 2023 at 11:20 AM John Casey via user >> wrote: >> >>> Hi, >>> >>> Could you add some more detail? Which beam version are you using? >>> >>> >>> On Tue, Dec 5, 2023 at 1:52 PM hsy...@gmail.com >>> wrote: >>> >>>> Any one has experience in writing to BQ using storage api >>>> >>>> I tried to use it because according to the document it is more efficient >>>> but I got error below >>>> >>>> 2023-12-05 04:01:29.741 PST >>>> Error message from worker: java.lang.RuntimeException: >>>> java.lang.IllegalStateException >>>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:573) >>>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790) >>>> Caused by: java.lang.IllegalStateException >>>> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:496) >>>> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.pin(BigQueryServicesImpl.java:1403) >>>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:565) >>>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790) >>>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeProcessElement(Unknown >>>> Source) >>>> >>>
Re: Questions about writing to BigQuery using storage api
I'm using version 2.51.0 and The configuration is like this write .withoutValidation() .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) .withExtendedErrorInfo() .withMethod(Write.Method.STORAGE_WRITE_API) .withTriggeringFrequency(Duration.standardSeconds(10)) .withAutoSharding().optimizedWrites() .withFailedInsertRetryPolicy(retryTransientErrors()); On Tue, Dec 5, 2023 at 11:20 AM John Casey via user wrote: > Hi, > > Could you add some more detail? Which beam version are you using? > > > On Tue, Dec 5, 2023 at 1:52 PM hsy...@gmail.com wrote: > >> Any one has experience in writing to BQ using storage api >> >> I tried to use it because according to the document it is more efficient >> but I got error below >> >> 2023-12-05 04:01:29.741 PST >> Error message from worker: java.lang.RuntimeException: >> java.lang.IllegalStateException >> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:573) >> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790) >> Caused by: java.lang.IllegalStateException >> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:496) >> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.pin(BigQueryServicesImpl.java:1403) >> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:565) >> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790) >> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeProcessElement(Unknown >> Source) >> >
Questions about writing to BigQuery using storage api
Any one has experience in writing to BQ using storage api I tried to use it because according to the document it is more efficient but I got error below 2023-12-05 04:01:29.741 PST Error message from worker: java.lang.RuntimeException: java.lang.IllegalStateException org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:573) org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790) Caused by: java.lang.IllegalStateException org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:496) org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.pin(BigQueryServicesImpl.java:1403) org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:565) org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790) org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
Watermark never progress for deduplicate transform
Hello, I'm using the https://beam.apache.org/releases/javadoc/2.21.0/org/apache/beam/sdk/transforms/Deduplicate.html transform to help dedup my data but in the monitoring page I see the watermark is not moving forward. Is it common for that transformation? Thanks
Question about metrics
Hi I have questions about metrics. I want to use beam metrics api to send metrics to GCP monitoring. Instead of collecting just some simple numeric values. I also need to send labels along with them. Is there a way to do that? Thanks!