Re: Does withkeys transform enforce a reshuffle?

2024-01-19 Thread hsy...@gmail.com
Also I looked at the code, reshuffle seems doing some groupby work
internally. But I don't really need groupby

On Fri, Jan 19, 2024 at 9:35 AM hsy...@gmail.com  wrote:

> ReShuffle is deprecated
>
> On Fri, Jan 19, 2024 at 8:25 AM XQ Hu via user 
> wrote:
>
>> I do not think it enforces a reshuffle by just checking the doc here:
>> https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html?highlight=withkeys#apache_beam.transforms.util.WithKeys
>>
>> Have you tried to just add ReShuffle after PubsubLiteIO?
>>
>> On Thu, Jan 18, 2024 at 8:54 PM hsy...@gmail.com 
>> wrote:
>>
>>> Hey guys,
>>>
>>> I have a question, does withkeys transformation enforce a reshuffle?
>>>
>>> My pipeline basically look like this PubsubLiteIO -> ParDo(..) ->
>>> ParDo() -> BigqueryIO.write()
>>>
>>> The problem is PubsubLiteIO -> ParDo(..) -> ParDo() always fused
>>> together. But The ParDo is expensive and I want dataflow to have more
>>> workers to work on that, what's the best way to do that?
>>>
>>> Regards,
>>>
>>>


Re: Does withkeys transform enforce a reshuffle?

2024-01-19 Thread hsy...@gmail.com
ReShuffle is deprecated

On Fri, Jan 19, 2024 at 8:25 AM XQ Hu via user  wrote:

> I do not think it enforces a reshuffle by just checking the doc here:
> https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html?highlight=withkeys#apache_beam.transforms.util.WithKeys
>
> Have you tried to just add ReShuffle after PubsubLiteIO?
>
> On Thu, Jan 18, 2024 at 8:54 PM hsy...@gmail.com  wrote:
>
>> Hey guys,
>>
>> I have a question, does withkeys transformation enforce a reshuffle?
>>
>> My pipeline basically look like this PubsubLiteIO -> ParDo(..) -> ParDo()
>> -> BigqueryIO.write()
>>
>> The problem is PubsubLiteIO -> ParDo(..) -> ParDo() always fused
>> together. But The ParDo is expensive and I want dataflow to have more
>> workers to work on that, what's the best way to do that?
>>
>> Regards,
>>
>>


Does withkeys transform enforce a reshuffle?

2024-01-18 Thread hsy...@gmail.com
Hey guys,

I have a question, does withkeys transformation enforce a reshuffle?

My pipeline basically look like this PubsubLiteIO -> ParDo(..) -> ParDo()
-> BigqueryIO.write()

The problem is PubsubLiteIO -> ParDo(..) -> ParDo() always fused together.
But The ParDo is expensive and I want dataflow to have more workers to work
on that, what's the best way to do that?

Regards,


ParDo(DoFn) with multiple context.output vs FlatMapElements

2023-12-27 Thread hsy...@gmail.com
Hello

I have a question. If I have a transform for each input it will emit 1 or
many output (same collection)
I can do it with ParDo + DoFun while in processElement method for each
input, call context.output multiply times vs doing it with FlatMapElements,
is there any difference? Does the dataflow fuse the downstream transform
automatically? Eventually I want more downstream transform workers cause it
needs to handle more data, How do I supposed to do that?

Regards,
Siyuan


pubsubliteio ack problem

2023-12-21 Thread hsy...@gmail.com
In my application, the pubsubliteio seems never ack the message and the
data lateness is building up forever, my question is how does dataflow know
when to ack the message, How does the engine even know when it is
processed?


How to set flow control for pubsubliteio?

2023-12-20 Thread hsy...@gmail.com
How to change flow control config for pubsubliteio ?

I saw the setting has been taken out as part of
https://issues.apache.org/jira/browse/BEAM-14129

But without setup flow control correctly, my beam app is running super slow
ingesting from pubsbulite and getting NO_CLIENT_TOKEN error on the server
side, which suggest to increase the flow control setting


Re: pubsubliteio is super slow

2023-12-19 Thread hsy...@gmail.com
Do you have a ticket?

And in the pubsublite metrics it show NO_CLIENT_TOKENS

On Tue, Dec 19, 2023 at 1:39 PM Nirav Patel  wrote:

> we have. yes it is super slow.  I tested python, java IO version as well
> besides beam IO. we reported to google about this problem.
>
> On Tue, Dec 19, 2023 at 10:17 AM hsy...@gmail.com 
> wrote:
>
>> Any one is using pubsublite? I find it super slow 5 messages/sec and
>> there is no options for me to tune the performance
>>
>


pubsubliteio is super slow

2023-12-19 Thread hsy...@gmail.com
Any one is using pubsublite? I find it super slow 5 messages/sec and there
is no options for me to tune the performance


Re: Questions about writing to BigQuery using storage api

2023-12-07 Thread hsy...@gmail.com
Caused by is in the error message

On Thu, Dec 7, 2023 at 10:47 AM Reuven Lax via user 
wrote:

> This is the stack trace of the rethrown exception. The log should also
> contain a "caused by" log somewhere detailing the original exception. Do
> you happen to have that?
>
> On Thu, Dec 7, 2023 at 8:46 AM hsy...@gmail.com  wrote:
>
>> Here is the complete stacktrace  It doesn't even hit my code and it
>> happens consistently!
>>
>> Error message from worker: java.lang.RuntimeException:
>> java.lang.IllegalStateException
>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:573)
>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790)
>> Caused by: java.lang.IllegalStateException
>> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:496)
>> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.pin(BigQueryServicesImpl.java:1403)
>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:565)
>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790)
>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:185)
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:430)
>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:85)
>> org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.flushBatch(GroupIntoBatches.java:660)
>> org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.processElement(GroupIntoBatches.java:518)
>> org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:185)
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1433)
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:155)
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$4.run(StreamingDataflowWorker.java:1056)
>> org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:163)
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>&g

Re: Questions about writing to BigQuery using storage api

2023-12-07 Thread hsy...@gmail.com
Here is the complete stacktrace  It doesn't even hit my code and it happens
consistently!

Error message from worker: java.lang.RuntimeException:
java.lang.IllegalStateException
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:573)
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790)
Caused by: java.lang.IllegalStateException
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:496)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.pin(BigQueryServicesImpl.java:1403)
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:565)
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790)
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:185)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:430)
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:85)
org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.flushBatch(GroupIntoBatches.java:660)
org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.processElement(GroupIntoBatches.java:518)
org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:185)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1433)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:155)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$4.run(StreamingDataflowWorker.java:1056)
org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:163)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:834)

Regards,
Siyuan

On Wed, Dec 6, 2023 at 10:12 AM Ahmed Abualsaud 
wrote:

> Hey, can you provide the full stack trace for the error you're seeing?
> Also is this happening consistently?
>
> *+1* to raising a Google ticket where we'll have more visibility.
>
> On Wed, Dec 6, 2023 at 11:33 AM John Casey 
> wrote:
>
>> Hmm. It may be best if you raise a ticket with Google support for this. I
>> can inspect your job directly if you do that, and that will make this more
>> straightforward.
>>
>> On Wed, Dec 6, 2023 at 11:24 AM hsy...@gmail.com 
>> wrote:
>>
>>> I’m just using dataflow engine
>>> On Wed, Dec 6, 202

Re: Questions about writing to BigQuery using storage api

2023-12-06 Thread hsy...@gmail.com
I’m just using dataflow engine
On Wed, Dec 6, 2023 at 08:23 John Casey via user 
wrote:

> Well, that is odd. It looks like the underlying client is closed, which is
> unexpected.
>
> Do you see any retries in your pipeline? Also, what runner are you using?
>
> @Ahmed Abualsaud  this might be interesting to
> you too
>
> On Tue, Dec 5, 2023 at 9:39 PM hsy...@gmail.com  wrote:
>
>> I'm using version 2.51.0 and The configuration is like this
>>
>> write
>> .withoutValidation()
>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>> .withExtendedErrorInfo()
>> .withMethod(Write.Method.STORAGE_WRITE_API)
>> .withTriggeringFrequency(Duration.standardSeconds(10))
>> .withAutoSharding().optimizedWrites()
>> .withFailedInsertRetryPolicy(retryTransientErrors());
>>
>>
>> On Tue, Dec 5, 2023 at 11:20 AM John Casey via user 
>> wrote:
>>
>>> Hi,
>>>
>>> Could you add some more detail? Which beam version are you using?
>>>
>>>
>>> On Tue, Dec 5, 2023 at 1:52 PM hsy...@gmail.com 
>>> wrote:
>>>
>>>> Any one has experience in writing to BQ using storage api
>>>>
>>>> I tried to use it because according to the document it is more efficient
>>>> but I got error below
>>>>
>>>> 2023-12-05 04:01:29.741 PST
>>>> Error message from worker: java.lang.RuntimeException:
>>>> java.lang.IllegalStateException
>>>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:573)
>>>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790)
>>>> Caused by: java.lang.IllegalStateException
>>>> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:496)
>>>> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.pin(BigQueryServicesImpl.java:1403)
>>>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:565)
>>>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790)
>>>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeProcessElement(Unknown
>>>> Source)
>>>>
>>>


Re: Questions about writing to BigQuery using storage api

2023-12-05 Thread hsy...@gmail.com
I'm using version 2.51.0 and The configuration is like this

write
.withoutValidation()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withMethod(Write.Method.STORAGE_WRITE_API)
.withTriggeringFrequency(Duration.standardSeconds(10))
.withAutoSharding().optimizedWrites()
.withFailedInsertRetryPolicy(retryTransientErrors());


On Tue, Dec 5, 2023 at 11:20 AM John Casey via user 
wrote:

> Hi,
>
> Could you add some more detail? Which beam version are you using?
>
>
> On Tue, Dec 5, 2023 at 1:52 PM hsy...@gmail.com  wrote:
>
>> Any one has experience in writing to BQ using storage api
>>
>> I tried to use it because according to the document it is more efficient
>> but I got error below
>>
>> 2023-12-05 04:01:29.741 PST
>> Error message from worker: java.lang.RuntimeException:
>> java.lang.IllegalStateException
>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:573)
>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790)
>> Caused by: java.lang.IllegalStateException
>> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:496)
>> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.pin(BigQueryServicesImpl.java:1403)
>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:565)
>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790)
>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>>
>


Questions about writing to BigQuery using storage api

2023-12-05 Thread hsy...@gmail.com
Any one has experience in writing to BQ using storage api

I tried to use it because according to the document it is more efficient
but I got error below

2023-12-05 04:01:29.741 PST
Error message from worker: java.lang.RuntimeException:
java.lang.IllegalStateException
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:573)
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790)
Caused by: java.lang.IllegalStateException
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:496)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.pin(BigQueryServicesImpl.java:1403)
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:565)
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790)
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source)


Watermark never progress for deduplicate transform

2023-09-06 Thread hsy...@gmail.com
Hello,

I'm using the
https://beam.apache.org/releases/javadoc/2.21.0/org/apache/beam/sdk/transforms/Deduplicate.html
transform to help dedup my data but in the monitoring page I see the
watermark is not moving forward. Is it common for that transformation?

Thanks


Question about metrics

2023-05-12 Thread hsy...@gmail.com
Hi I have questions about metrics. I want to use beam metrics api to send
metrics to GCP monitoring.  Instead of collecting just some simple
numeric values. I also need to send labels along with them. Is there a way
to do that? Thanks!