Re: Histogram metrics in Dataflow/Beam

2022-05-06 Thread Siyu Lin
Got it! Thanks so much, and will try it out! 

-siyu

> 
> On May 6, 2022, at 4:53 PM, Pablo Estrada  wrote:
> 
> 
> Hi Siyu!
> Yeah, that's a good question!
> 
> Depending on the properties of the client that you're using, you can create 
> it as a class/static variable rather than an instance variable:
> 
> class MyPrometheusDoFn extends DoFn<...> {
>   static PrometheusClient client = Prometheus.client();   // Or whatever
> }
> 
> You just need to consider that in this scenario, the variable `client` may be 
> used by many threads at the same time - so you need to make sure that 
> PrometheusClient is thread safe, or you need to synchronize accesses to it. 
> (maybe it supports some kind of async call?)
> 
> Let me know if that makes sense?
> Best
> -P.
> 
>> On Fri, May 6, 2022 at 4:33 PM Siyu Lin  wrote:
>> sorry for typo “recommissioned way” should be “recommended way”
>> 
>> 
>>>> On May 6, 2022, at 4:31 PM, Siyu Lin  wrote:
>>>> 
>>> 
>>> Hi Pablo,
>>> 
>>> Thanks so much for your explanation! 
>>> 
>>> Also, for prom client in the do fn, do we need to initialize them in the 
>>> setup code? My concern is that if we have this initialization in each do 
>>> fn, we might overwhelm the memory. I think we can pass them as the 
>>> parameter? Do you have a recommissioned way to do that not to hurt the 
>>> performance?
>>> 
>>> Will try that out to see if this is workable! 
>>> 
>>> Siyu
>>> 
>>>> 
>>>>> On May 6, 2022, at 3:35 PM, Pablo Estrada  wrote:
>>>>> 
>>>> 
>>>> ah - no, I am not aware of anyone running side containers.
>>>> 
>>>> You can add libraries to your container that will be available, so that 
>>>> you can import prometheus client libraries in your DoFns and export 
>>>> metrics directly 'by hand' (e.g. processElement(..) {  
>>>> prometheusCliient.reportMetric("name", TYPE, value); })
>>>> 
>>>> If you need to run another program in your worker - I suppose you can add 
>>>> new binaries that run next to your Beam worker processes (in the same 
>>>> container, and you have to start them up in the entrypoint of your 
>>>> Dockerfile). I've never seen anyone do this, but it sounds quite doable.
>>>> 
>>>> Do any of these options work for you?
>>>> Best
>>>> -P.
>>>> 
>>>>> On Fri, May 6, 2022 at 3:16 PM Siyu Lin  wrote:
>>>>> Hi Pablo,
>>>>> 
>>>>> No worries at all! 
>>>>> 
>>>>> Was wondering if “add any dependencies” means to add a side container to 
>>>>> the dataflow runner? Like prometheus scraper? 
>>>>> 
>>>>> If so, is it only working for dataflow runner v2? We have not upgraded to 
>>>>> v2 yet so it might be hard to do that in v1.
>>>>> 
>>>>> Thanks so much!
>>>>> 
>>>>> Siyu
>>>>> 
>>>>>> On May 6, 2022, at 11:57 AM, Pablo Estrada  wrote:
>>>>>> 
>>>>>> Sorry about the delay!
>>>>>> 
>>>>>> Yes, you can add any dependencies to your image - and you can add custom 
>>>>>> reporting of metrics that you're tracking directly. That may help?
>>>>>> 
>>>>>>> On Mon, Apr 4, 2022 at 7:10 PM Siyu Lin  wrote:
>>>>>>> Hi Jeff,
>>>>>>> 
>>>>>>> Thanks so much for your quick responses. It is unfortunate that 
>>>>>>> histogram is unavailable in dataflow. Do you know if there are any 
>>>>>>> workaround? Or do you think it is plausible if we can use runner v2 and 
>>>>>>> customize the image with Prometheus exporter?
>>>>>>> 
>>>>>>> Thanks again! 
>>>>>>> Siyu
>>>>>>> 
>>>>>>>> 
>>>>>>>>> On Apr 4, 2022, at 5:00 PM, Jeff Klukas  wrote:
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> Siyu - The Beam metrics interface includes the Distribution metric 
>>>>>>>> type which can be used for histograms:
>>>>>>>> 
>>>>>>>> https://beam.apache.org/documentation/programming-guide/#types-of-metrics
>>>>>>>> 
>>>>>>>> Particulars of support depend on the runner. For Cloud Dataflow, the 
>>>>>>>> reported values are MAX, MIN, MEAN, and COUNT, so no support for 
>>>>>>>> finer-grained percentiles:
>>>>>>>> 
>>>>>>>> https://cloud.google.com/dataflow/docs/guides/using-cloud-monitoring#custom_metrics
>>>>>>>> 
>>>>>>>>> On Mon, Apr 4, 2022 at 7:45 PM Siyu Lin  wrote:
>>>>>>>>> Hi Beam community,
>>>>>>>>> 
>>>>>>>>> I am wondering if there is histogram metrics available (or 
>>>>>>>>> alternative recommendations) for showing up quantiles. We have 
>>>>>>>>> counter metrics already but we would also like to see some quantiles 
>>>>>>>>> for different values. 
>>>>>>>>> 
>>>>>>>>> Thanks a lot!
>>>>>>>>> Siyu
>>>>> 


Re: Histogram metrics in Dataflow/Beam

2022-05-06 Thread Siyu Lin
sorry for typo “recommissioned way” should be “recommended way”


> On May 6, 2022, at 4:31 PM, Siyu Lin  wrote:
> 
> 
> Hi Pablo,
> 
> Thanks so much for your explanation! 
> 
> Also, for prom client in the do fn, do we need to initialize them in the 
> setup code? My concern is that if we have this initialization in each do fn, 
> we might overwhelm the memory. I think we can pass them as the parameter? Do 
> you have a recommissioned way to do that not to hurt the performance?
> 
> Will try that out to see if this is workable! 
> 
> Siyu
> 
>> 
>>> On May 6, 2022, at 3:35 PM, Pablo Estrada  wrote:
>>> 
>> 
>> ah - no, I am not aware of anyone running side containers.
>> 
>> You can add libraries to your container that will be available, so that you 
>> can import prometheus client libraries in your DoFns and export metrics 
>> directly 'by hand' (e.g. processElement(..) {  
>> prometheusCliient.reportMetric("name", TYPE, value); })
>> 
>> If you need to run another program in your worker - I suppose you can add 
>> new binaries that run next to your Beam worker processes (in the same 
>> container, and you have to start them up in the entrypoint of your 
>> Dockerfile). I've never seen anyone do this, but it sounds quite doable.
>> 
>> Do any of these options work for you?
>> Best
>> -P.
>> 
>>> On Fri, May 6, 2022 at 3:16 PM Siyu Lin  wrote:
>>> Hi Pablo,
>>> 
>>> No worries at all! 
>>> 
>>> Was wondering if “add any dependencies” means to add a side container to 
>>> the dataflow runner? Like prometheus scraper? 
>>> 
>>> If so, is it only working for dataflow runner v2? We have not upgraded to 
>>> v2 yet so it might be hard to do that in v1.
>>> 
>>> Thanks so much!
>>> 
>>> Siyu
>>> 
>>>> On May 6, 2022, at 11:57 AM, Pablo Estrada  wrote:
>>>> 
>>>> Sorry about the delay!
>>>> 
>>>> Yes, you can add any dependencies to your image - and you can add custom 
>>>> reporting of metrics that you're tracking directly. That may help?
>>>> 
>>>>> On Mon, Apr 4, 2022 at 7:10 PM Siyu Lin  wrote:
>>>>> Hi Jeff,
>>>>> 
>>>>> Thanks so much for your quick responses. It is unfortunate that histogram 
>>>>> is unavailable in dataflow. Do you know if there are any workaround? Or 
>>>>> do you think it is plausible if we can use runner v2 and customize the 
>>>>> image with Prometheus exporter?
>>>>> 
>>>>> Thanks again! 
>>>>> Siyu
>>>>> 
>>>>>> 
>>>>>>> On Apr 4, 2022, at 5:00 PM, Jeff Klukas  wrote:
>>>>>>> 
>>>>>> 
>>>>>> Siyu - The Beam metrics interface includes the Distribution metric type 
>>>>>> which can be used for histograms:
>>>>>> 
>>>>>> https://beam.apache.org/documentation/programming-guide/#types-of-metrics
>>>>>> 
>>>>>> Particulars of support depend on the runner. For Cloud Dataflow, the 
>>>>>> reported values are MAX, MIN, MEAN, and COUNT, so no support for 
>>>>>> finer-grained percentiles:
>>>>>> 
>>>>>> https://cloud.google.com/dataflow/docs/guides/using-cloud-monitoring#custom_metrics
>>>>>> 
>>>>>>> On Mon, Apr 4, 2022 at 7:45 PM Siyu Lin  wrote:
>>>>>>> Hi Beam community,
>>>>>>> 
>>>>>>> I am wondering if there is histogram metrics available (or alternative 
>>>>>>> recommendations) for showing up quantiles. We have counter metrics 
>>>>>>> already but we would also like to see some quantiles for different 
>>>>>>> values. 
>>>>>>> 
>>>>>>> Thanks a lot!
>>>>>>> Siyu
>>> 


Re: Histogram metrics in Dataflow/Beam

2022-05-06 Thread Siyu Lin
Hi Pablo,

Thanks so much for your explanation! 

Also, for prom client in the do fn, do we need to initialize them in the setup 
code? My concern is that if we have this initialization in each do fn, we might 
overwhelm the memory. I think we can pass them as the parameter? Do you have a 
recommissioned way to do that not to hurt the performance?

Will try that out to see if this is workable! 

Siyu

> 
> On May 6, 2022, at 3:35 PM, Pablo Estrada  wrote:
> 
> 
> ah - no, I am not aware of anyone running side containers.
> 
> You can add libraries to your container that will be available, so that you 
> can import prometheus client libraries in your DoFns and export metrics 
> directly 'by hand' (e.g. processElement(..) {  
> prometheusCliient.reportMetric("name", TYPE, value); })
> 
> If you need to run another program in your worker - I suppose you can add new 
> binaries that run next to your Beam worker processes (in the same container, 
> and you have to start them up in the entrypoint of your Dockerfile). I've 
> never seen anyone do this, but it sounds quite doable.
> 
> Do any of these options work for you?
> Best
> -P.
> 
>> On Fri, May 6, 2022 at 3:16 PM Siyu Lin  wrote:
>> Hi Pablo,
>> 
>> No worries at all! 
>> 
>> Was wondering if “add any dependencies” means to add a side container to the 
>> dataflow runner? Like prometheus scraper? 
>> 
>> If so, is it only working for dataflow runner v2? We have not upgraded to v2 
>> yet so it might be hard to do that in v1.
>> 
>> Thanks so much!
>> 
>> Siyu
>> 
>>> On May 6, 2022, at 11:57 AM, Pablo Estrada  wrote:
>>> 
>>> Sorry about the delay!
>>> 
>>> Yes, you can add any dependencies to your image - and you can add custom 
>>> reporting of metrics that you're tracking directly. That may help?
>>> 
>>>> On Mon, Apr 4, 2022 at 7:10 PM Siyu Lin  wrote:
>>>> Hi Jeff,
>>>> 
>>>> Thanks so much for your quick responses. It is unfortunate that histogram 
>>>> is unavailable in dataflow. Do you know if there are any workaround? Or do 
>>>> you think it is plausible if we can use runner v2 and customize the image 
>>>> with Prometheus exporter?
>>>> 
>>>> Thanks again! 
>>>> Siyu
>>>> 
>>>>> 
>>>>>> On Apr 4, 2022, at 5:00 PM, Jeff Klukas  wrote:
>>>>>> 
>>>>> 
>>>>> Siyu - The Beam metrics interface includes the Distribution metric type 
>>>>> which can be used for histograms:
>>>>> 
>>>>> https://beam.apache.org/documentation/programming-guide/#types-of-metrics
>>>>> 
>>>>> Particulars of support depend on the runner. For Cloud Dataflow, the 
>>>>> reported values are MAX, MIN, MEAN, and COUNT, so no support for 
>>>>> finer-grained percentiles:
>>>>> 
>>>>> https://cloud.google.com/dataflow/docs/guides/using-cloud-monitoring#custom_metrics
>>>>> 
>>>>>> On Mon, Apr 4, 2022 at 7:45 PM Siyu Lin  wrote:
>>>>>> Hi Beam community,
>>>>>> 
>>>>>> I am wondering if there is histogram metrics available (or alternative 
>>>>>> recommendations) for showing up quantiles. We have counter metrics 
>>>>>> already but we would also like to see some quantiles for different 
>>>>>> values. 
>>>>>> 
>>>>>> Thanks a lot!
>>>>>> Siyu
>> 


Re: Histogram metrics in Dataflow/Beam

2022-05-06 Thread Siyu Lin
Hi Pablo,

No worries at all! 

Was wondering if “add any dependencies” means to add a side container to the 
dataflow runner? Like prometheus scraper? 

If so, is it only working for dataflow runner v2? We have not upgraded to v2 
yet so it might be hard to do that in v1.

Thanks so much!

Siyu

> On May 6, 2022, at 11:57 AM, Pablo Estrada  wrote:
> 
> Sorry about the delay!
> 
> Yes, you can add any dependencies to your image - and you can add custom 
> reporting of metrics that you're tracking directly. That may help?
> 
> On Mon, Apr 4, 2022 at 7:10 PM Siyu Lin  <mailto:siyu...@unity3d.com>> wrote:
> Hi Jeff,
> 
> Thanks so much for your quick responses. It is unfortunate that histogram is 
> unavailable in dataflow. Do you know if there are any workaround? Or do you 
> think it is plausible if we can use runner v2 and customize the image with 
> Prometheus exporter?
> 
> Thanks again! 
> Siyu
> 
>> 
>> On Apr 4, 2022, at 5:00 PM, Jeff Klukas > <mailto:jklu...@mozilla.com>> wrote:
>> 
>> 
>> Siyu - The Beam metrics interface includes the Distribution metric type 
>> which can be used for histograms:
>> 
>> https://beam.apache.org/documentation/programming-guide/#types-of-metrics 
>> <https://beam.apache.org/documentation/programming-guide/#types-of-metrics>
>> 
>> Particulars of support depend on the runner. For Cloud Dataflow, the 
>> reported values are MAX, MIN, MEAN, and COUNT, so no support for 
>> finer-grained percentiles:
>> 
>> https://cloud.google.com/dataflow/docs/guides/using-cloud-monitoring#custom_metrics
>>  
>> <https://cloud.google.com/dataflow/docs/guides/using-cloud-monitoring#custom_metrics>
>> On Mon, Apr 4, 2022 at 7:45 PM Siyu Lin > <mailto:siyu...@unity3d.com>> wrote:
>> Hi Beam community,
>> 
>> I am wondering if there is histogram metrics available (or alternative 
>> recommendations) for showing up quantiles. We have counter metrics already 
>> but we would also like to see some quantiles for different values. 
>> 
>> Thanks a lot!
>> Siyu



Re: IllegalMutationException in PTransform

2022-05-06 Thread Siyu Lin
Hi Reuven,

Do you mean we should have coder explicitly defined for all input types and 
output types in chaining do fns? Do we also need to have comparedTo and equals 
defined as well?

thanks again!
Siyu

> On May 6, 2022, at 12:23 PM, Reuven Lax  wrote:
> 
> 
> Could be - I would check the implementation of inputAdapator.
> 
>> On Fri, May 6, 2022 at 11:59 AM Yuri Jin  wrote:
>> Thanks, I'll check it out.
>> 
>> I split inputAdaptor.adapt() into different DoFn for testing and it threw 
>> the same exception for the new DoFn. So I guess it's because of 
>> inputAdaptor.adapt().
>> 
>>> On Fri, May 6, 2022 at 11:45 AM Reuven Lax  wrote:
>>> I meant to say .equals() not compareTo.
>>> 
 On Fri, May 6, 2022 at 11:44 AM Reuven Lax  wrote:
 Unfortunately I'm not very familiar with Scio. However this could also be 
 caused by an object that either doesn't properly implement the compareTo 
 method or the coder doesn't return such an object in structuralValue.
 
> On Fri, May 6, 2022 at 11:26 AM Yuri Jin  wrote:
> Reuven, thanks for the reply.
> 
> The input type is "KafkaRecord[Array[Byte], Array[Byte]]" and uses the 
> "KafkaRecordCoder.of(NullableCoder.of(ByteArrayCoder.of), 
> ByteArrayCoder.of)" coder.
> I can't paste the code for DoFn due to company policy, but here's the 
> structure:
> 
> //
> Pipeline.scala
> ---
> type InputType = KafkaRecord[Array[Byte], Array[Byte]]
> 
> //
> ParsePayloadDoFn.scala
> ---
> class ParsePayloadDoFn[InputType](
>   inputAdaptor: RowAdaptor[InputType],
>   ...
>   deadLetterTag: TupleTag[KV[KeyType, ValueType]]) extends 
> DoFn[InputType, OutpuType] {
> 
>   @Setup
>   def setup(): Unit =
> inputAdaptor.setup()
> 
>   @ProcessElement
>   def processElement(c: DoFn[InputType, OutputType]#ProcessContext): Unit 
> =
> Try {
>   val result: Result = inputAdaptor.adapt(c.element()) // parse 
> payload
>   ...
>   val deadLetterMessageFn: KV[KeyType, ValueType] => Unit = 
> c.output(deadLetterTag, _)
>   val outputPayloadFn: OutputType => Unit = c.output
> 
>   result.protocol match {
> case error: ErrorType =>
>   deadLetterMessageFn(KV.of(..., ...))
> case payload: Payload =>
>   payload.events.zipWithIndex.foreach {
> case failure: ParsingFailure =>
>   deadLetterMessageFn(KV.of(..., ...))
> case (message: Message, index: Int) =>
>   // extract body from Message
>   val body = ...
>   // make a GET http call and compose output
>   val output = ...
>   
>   outputPayloadFn(
> OutputType(
>   output,
>   ...
>   payload.header,
>   body,
>   index
> )
>   )
>   }
>   }
> } match {
>   case Failure(exception) =>
> error(
>   s"ParsePayloadDoFn - unhandled exception: 
> ${exception.getMessage}\nStack trace: 
> ${ExceptionUtils.getStackTrace(exception)}"
> )
>   case Success(_) => ()
> }
> }
> //
> 
> For reference, we are using Scio v0.11.5 and Beam v2.36.0.
> 
> Thank you,
> Yuri Jin
> 
> 
>> On Thu, May 5, 2022 at 10:36 PM Reuven Lax  wrote:
>> What is the type of the input - do you have a custom coder? Are you able 
>> to paste the code for your DoFn?
>> 
>> In answer to your question - Direct runner tests for this, because it is 
>> a testing runner. This error scenario can cause random unexpected 
>> behavior in production runners, which is why the testing runner tries to 
>> explicitly detect it.
>> 
>> Reuven
>> 
>>> On Thu, May 5, 2022 at 8:52 PM Yuri Jin  wrote:
>>> Hi Beam users,
>>> 
>>> We have a DoFn that reads data from Kafka and parses an array byte 
>>> payload. It works fine with dataflow runner, but throws 
>>> IllegalMutationException with direct runner. It does not directly 
>>> modify the input value. Therefore, I am guessing that the output is 
>>> different when there are multiple input values.
>>> 
>>> The detailed error is as follows.
>>> Exception in thread "main" 
>>> org.apache.beam.sdk.util.IllegalMutationException: PTransform Parse 
>>> Payload mutated value "DoFnOutputA" after it was output (new value was 
>>> "DoFnOutputB"). Values must not be mutated in any way after being 
>>> output.
>>> at 
>>

Breaking change for FileIO WriteDynamic in Beam 2.34?

2022-04-05 Thread Siyu Lin
Hi Beam community,

We have a batch pipeline which does not run regularly. Recently we
have upgraded to Beam 2.36 and this broke the FileIO WriteDynamic
process.

We are using Dataflow Runner, and the errors are like this when there
are multiple workers:

Error message from worker: java.lang.NoClassDefFoundError: Could not
initialize class
org.apache.beam.runners.dataflow.worker.ApplianceShuffleWriter
org.apache.beam.runners.dataflow.worker.ShuffleSink.writer(ShuffleSink.java:348)
org.apache.beam.runners.dataflow.worker.SizeReportingSinkWrapper.writer(SizeReportingSinkWrapper.java:46)
org.apache.beam.runners.dataflow.worker.util.common.worker.WriteOperation.initializeWriter(WriteOperation.java:71)
org.apache.beam.runners.dataflow.worker.util.common.worker.WriteOperation.start(WriteOperation.java:78)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92)
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:420)
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:389)
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:314)
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

However when there is only a single worker, the error is like this:

The job failed because a work item has failed 4 times. Look in
previous log entries for the cause of each one of the 4 failures. For
more information, see
https://cloud.google.com/dataflow/docs/guides/common-errors. The work
item was attempted on these workers: xxx Root cause: The worker lost
contact with the service.,

The error guided suggested upgrade machine type.

Those errors happen when using SDK 2.34+. When I switched to SDK 2.33,
everything worked well without any issues. Tried SDK 2.34, 2.35 and
2.36, and found all of them got the same issue.

Context: The code simply just reads from BigQuery with a fixed table
of 4,034 records, does some transform, and outputs to GCS with
FileIO.WriteDynamic. All tests were performed using the same machine
type with the same worker number.

Does anyone know if there are any breaking changes in this SDK /
Dataflow runner?

Thanks so much!
Siyu


Re: Histogram metrics in Dataflow/Beam

2022-04-04 Thread Siyu Lin
Hi Jeff,

Thanks so much for your quick responses. It is unfortunate that histogram is 
unavailable in dataflow. Do you know if there are any workaround? Or do you 
think it is plausible if we can use runner v2 and customize the image with 
Prometheus exporter?

Thanks again! 
Siyu

> 
> On Apr 4, 2022, at 5:00 PM, Jeff Klukas  wrote:
> 
> 
> Siyu - The Beam metrics interface includes the Distribution metric type which 
> can be used for histograms:
> 
> https://beam.apache.org/documentation/programming-guide/#types-of-metrics
> 
> Particulars of support depend on the runner. For Cloud Dataflow, the reported 
> values are MAX, MIN, MEAN, and COUNT, so no support for finer-grained 
> percentiles:
> 
> https://cloud.google.com/dataflow/docs/guides/using-cloud-monitoring#custom_metrics
> 
>> On Mon, Apr 4, 2022 at 7:45 PM Siyu Lin  wrote:
>> Hi Beam community,
>> 
>> I am wondering if there is histogram metrics available (or alternative 
>> recommendations) for showing up quantiles. We have counter metrics already 
>> but we would also like to see some quantiles for different values. 
>> 
>> Thanks a lot!
>> Siyu


Histogram metrics in Dataflow/Beam

2022-04-04 Thread Siyu Lin
Hi Beam community,

I am wondering if there is histogram metrics available (or alternative 
recommendations) for showing up quantiles. We have counter metrics already but 
we would also like to see some quantiles for different values. 

Thanks a lot!
Siyu

Re: "Slowly updating global window side inputs" example buggy?

2022-03-28 Thread Siyu Lin
Hi Reza,

Just read a blog post[1] by you two years ago and you mentioned 

> Because this pattern uses a global-window SideInput, matching to elements 
> being processed will be nondeterministic.

Do you mean two workers working on the same windowed main input and use 
different global windowed side input?

Also, if that is the case, do you think “Flatten, FixedWindow” will solve the 
case?

Thanks so much!
Siyu

[1]: 
https://cloud.google.com/blog/products/data-analytics/guide-to-common-cloud-dataflow-use-case-patterns-part-1

> On Mar 28, 2022, at 7:48 PM, Reza Rokni  wrote:
> 
> So digging a little into this, 
> 
> To simulate the condition if we have a PCollectionList which has two 
> GenerateSequence then it should result in the more than one element in the 
> View error. 
> 
> Then rather than apply the GlobalWindow right after, we do a Flatten, 
> FixedWIndow, Sum this should ensure only 1 value comes out into the DoFN 
> which has the readTestData(). We then apply the GlobalWindow before the View 
> as Singleton. Remove the Fixed Window and the Sum and the below should fail, 
> with it no error, but need to think if that does actually cover all bases. 
> 
> PCollection toMany = 
> PCollectionList.of(p.apply(GenerateSequence.from(0)
> .withRate(1, 
> Duration.standardSeconds(5L.and(p.apply(GenerateSequence.from(0)
> .withRate(1, Duration.standardSeconds(5L
> .apply(Flatten.pCollections())
> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(5
> .apply(Sum.longsGlobally().withoutDefaults());
> 
> // Create a side input that updates each second.
> PCollectionView> map =
> toMany
> .apply(
> ParDo.of(
> new DoFn>() {
> 
> @ProcessElement
> public void process(
> @Element Long input, 
> OutputReceiver> o) {
> // Replace map with test data from 
> the placeholder external service.
> // Add external reads here.
> 
> o.output(PlaceholderExternalService.readTestData());
> }
> }))
> .apply( Window.>into(new GlobalWindows())
> 
> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
> .discardingFiredPanes())
> .apply(View.asSingleton());
> 
> 
> 
> On Thu, Mar 3, 2022 at 6:25 PM John Gerassimou  > wrote:
> Has anyone gotten a chance to investigate? Our team likes the benefits of 
> this pattern. We are hesitant to use it in production as it presents a risk.
> 
> On Tue, Feb 22, 2022 at 3:11 PM Reza Rokni  > wrote:
> Yup this looks to be a buggy example, the bug is probably not often 
> encountered as one of the properties of this example is that its 
> non-deterministic as to when the values will be pushed to workers. So most 
> uses will be setting a larger value for duration which would then hit the bug 
> in very rare cases.
> 
> I will try to run some tests using Reuven's Latest method and update the 
> example. 
> 
> On Tue, Feb 22, 2022 at 11:29 AM John Gerassimou  > wrote:
> I also had issues using this pattern. In most cases, it works fine, but the 
> duplicate error showed up after 4000 or so triggers using a 30-second timer. 
> I've tried to apply aggregation before View Singleton to enforce a single 
> element, but that didn't solve the issue.
> 
> Setting the timer to 5-minutes seemed to alleviate (or delay) the problem but 
> I need to do more testing. 
> 
> On Tue, Feb 22, 2022 at 2:22 PM Reuven Lax  > wrote:
> elementCountAtLeast only guarantees a lower bound on the elements in a pane. 
> No upper bound is guaranteed.
> 
> On Tue, Feb 22, 2022 at 11:16 AM Steve Niemitz  > wrote:
> Does "Repeatedly.forever(AfterPane.elementCountAtLeast(1)" solve this?  At 
> least in my tests it seems like this correctly only emits a single element 
> per pane, but I'm not sure how much of a guarantee there actually is that 
> there will never be more than N elements in a pane when 
> elementCountAtLeast(N) is set.
> 
> On Tue, Feb 22, 2022 at 2:06 PM Luke Cwik  > wrote:
> I'm not certain that Latest would work since the processing time trigger 
> would still cause multiple firings to occur each producing the "latest" at 
> that point in time. All these firings would effectively be output to the 
> PCollection that the view is over. The PCollection would effectively be a 
> concatenation of all these firings.
> 
> 
> 
> On Tue, Feb 22, 2022 at 10:57 AM Pavel Solomin  

Java SDK: Is there a system property to print out detailed stack trace in test?

2022-01-24 Thread Siyu Lin
Hi all,

Was wondering if we can set up a system property to print out detailed errors 
when tests throwing exceptions / errors. I remember that I set it up before but 
forget the exact name.

Appreciate any help!
-siyu

Re: Beam program with Flink runner which can limit number of records processed in specified time when reading from Kafka

2021-12-15 Thread Siyu Lin
Hi Luke,

I had the same issue before. Was wondering if it is possible to use a
different timestamp as the watermark. Thanks!

-siyu

On Fri, Nov 19, 2021 at 11:42 AM Luke Cwik  wrote:

> The issue seems to be that when doing a backfill Kafka is using wall time
> to estimate the watermark and then producing massive amounts of records for
> each interval window.
>
> If you set the watermark estimator within Kafka to be based upon data
> within the records then your interval windows will have pretty consistent
> sizes based upon how many records actually exist in those 10 minute windows.
>
> On Mon, Oct 4, 2021 at 2:50 PM Kathula, Sandeep <
> sandeep_kath...@intuit.com> wrote:
>
>> Hi,
>>
>> I have a Beam code with Flink runner which reads from Kafka, applies
>> 10 minutes window and writes the data into parquet format in S3. Its
>> running fine when everything goes well. But due to some issue, if my
>> pipeline stops running for an hour or two, then for it to catch up from
>> latest Flink checkpoint it’s trying to read data from Kafka at a very high
>> rate and trying to dump to S3 in parquet format. As the data processed in
>> the latest window period of 10 minutes is huge because of catching up with
>> lag, it is failing with out of memory and its never able to be run
>> successfully with my current resources. I checked that there is a Beam
>> property called maxBundleSize through which we can control maximum size of
>> a bundle but I didn’t find any property to handle number of bundles
>> processed within the window interval.
>>
>>
>>
>>I wanted to check if there is any way to limit number of records
>> processed within a window interval.
>>
>>
>>
>> Thanks,
>>
>> Sandeep
>>
>


Re: [Question] Testing interaction of streaming main input and slowly updating side input

2021-09-13 Thread Siyu Lin
Hi Johannes,

L20 in your code will make it run forever so it is better to put a finite 
number there, like 
`GenerateSequence.from(0).to(1).withRate(1, Duration.standardSeconds(2))`

-siyu

> On Sep 10, 2021, at 6:06 AM, Johannes Frey  wrote:
> 
> Hi everybody,
> 
> I'm currently having a hard time wrapping my head around streaming
> data processing.
> 
> Szenario:
> I have a main stream of data that is going to be processed (orders
> entering the system) and at some point in the pipeline in one
> processing step I need a side input to enrich the processed data.
> 
> This side input should update itself once a day and is created using
> the example in the documentation
> https://beam.apache.org/documentation/patterns/side-inputs/ section:
> "Slowly updating global window side inputs"
> 
> To make sure that everything works as expected I would like to write a
> Junit test to make sure the side input updates regularly and the new
> arriving data indeed gets the updated values from the side input.
> 
> Here is the code for what I am trying to do: https://pastebin.com/8mtPKTcv
> 
> The result I'm getting is that:
> - The SideInput is triggered
> - Processing starts
> - all elements get processed up to the point of the processing step
> that actually needs the side input then it blocks
> - sideInput gets triggered again
> - processing still blocked
> 
> Could anyone please explain to me where I'm wrong. I already tried
> several things like introducing fixed windows to the main-stream but
> no luck so far... also I couldn't find much information using google.
> 
> I also printed the ctx.timestamp() of the processings here is how they look:
> 2021-09-10T13:03:10.021Z sideloaded elements: 1767
> 2021-09-10T13:03:07.072Z Filter called
> 2021-09-10T13:03:07.073Z Filter called
> 2021-09-10T13:03:07.073Z Filter called
> 2021-09-10T13:03:07.073Z Filter called
> 2021-09-10T13:03:07.073Z Filter called
> 2021-09-10T13:03:07.073Z Filter called
> 2021-09-10T13:03:07.073Z Filter called
> 2021-09-10T13:03:07.073Z Filter called
> 2021-09-10T13:03:07.073Z Filter called
> 2021-09-10T13:03:07.073Z Filter called
> 2021-09-10T13:03:30.022Z sideloaded elements: 1767
> 
> Your help would be really appreciated.
> 
> Thanks and regards