Re: Generating Hearbeats Using Looping Timer

2022-07-08 Thread gaurav mishra
Maybe the previous post was too verbose so I will try to summarize my
question -
If one instance of DoFn tries to set a timer for a time which is behind the
pipeline's watermark, can this cause the pipeline to stall for other keys
as well?
"stall" meaning here - other keys' timers will start lagging behind.
say there are 1 million DoFns running in a steady state(behaving as
expected), where timers are firing at 5 min boundaries.
1 bad key comes which sets its timer to a time which is 1 hour older than
the current watermark. What happens here? my understanding here is this -
 the looping timer will fire back to back in quick succession for this bad
key 12 times and after that this key also joins the group of 1 million keys
which were firing regularly at 5 min boundaries.
PS - Above DoFn is using default Global Windows and default trigger.


On Thu, Jul 7, 2022 at 11:09 PM gaurav mishra 
wrote:

> Hello,
> I have a pipeline which is generating heartbeats using looping timers in a
> stateful dofn. Following is pseudo code for the process element and onTimer
> methods
>
> StateSpec> lastSeenMsg = StateSpecs.value(...);
> TimerSpec loopingTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>
>
> processElemnt(input) {
> // read event time from the message
> Instant currentEventTime = input.getEventTimeEpoc();
> if(input.state == ONLINE) {
>lastSeenMsg.write(input);
>// calculate start of looping timer
>// which will be next 5 min boundary
>long currentEventTimeEpocSeconds = currentEventTime.getMillis() / 1000;
>long offset = currentEventTimeEpocSeconds % 300;
>long nextFireTimeSeconds = currentEventTimeEpocSeconds - offset + 300;
>loopingTimer.set(Instant.ofEpochSecond(nextFireTimeSeconds));
> }
> else {
>  // stop hearbeats when entity offline
>   loopingTimer.clear();
>}
> }
>
>
> onTimer() {
> // emit out the lastSeenMsg
> output(lastSeenMsg.read());
>
>
> loopingTimer.set(timerContext.timestamp().plus(Duration.standardSeconds(300)));
> }
>
>
> The above pipeline works well in low load scenarios. But on one of my
> heavy traffic deployment the pipeline seems to be not able to keep up with
> the load. Input msg from pubsub are state change events for an entity -
>  Entity Online or Entity Offline messages. Once a entity comes Online we
> start generating heartbeat every 5 min as long as we do not encounter
> Offline message for that entity. Number of online entities can be fairly
> large, more than 10 Million entities can be Online at a given time.
>
> I am seeing this particular DoFn starts lagging behind as soon as it gets
> started. The timers are firing pretty late. The lag went up to 48 hours
> before I restarted the pipeline. Is there something wrong in what I am
> doing.
> Note - I am reading the eventTime embedded in the msg. Intent for this is
> fire a bunch of timers in quick succession if needed and fill up the DB
> with heartbeats till current time.
> So say a msg comes with state = Online and time = 10.02 AM. and current
> watermark is at 10.13AM.  I set the loopingTimer to start at 10:05, which i
> expect to fire immediately since the watermark is already ahead of this
> time? (Or this is wrong understanding). Similarly the subsequent call to
> onTimer method will set next timer to fire at 10:10 and that I also expect
> to fire immediately. After this point this DoFn should start emitting at
> same time with all other instances of this DoFn. Is there a mistake in this
> implementaion?
> Another thing I am noticing is that this pipeline is running a single
> dataflow worker and not scaling up automatically. For such a large key
> space (10 million DoFns and their timers) i expected the pipeline to use a
> lot of CPU near the 5 minute boudaries and scale up but that is also not
> happening.
>


Re: [Dataflow][Java] Guidance on Transform Mapping Streaming Update

2022-07-08 Thread Evan Galpin
Ya fair enough, makes sense. I’ll reach out to GCP. Thanks Luke!

- Evan

On Fri, Jul 8, 2022 at 11:24 Luke Cwik  wrote:

> I was suggesting GCP support mainly because I don't think you want to
> share the 2.36 and 2.40 version of your job file publicly as someone
> familiar with the layout and format may spot a meaningful difference.
>
> Also, if it turns out that there is no meaningful difference between the
> two then the internal mechanics of how the graph is modified by Dataflow is
> not surfaced back to you in enough depth to debug further.
>
>
>
> On Fri, Jul 8, 2022 at 6:12 AM Evan Galpin  wrote:
>
>> Thanks for your response Luke :-)
>>
>> Updating in 2.36.0 works as expected, but as you alluded to I'm
>> attempting to update to the latest SDK; in this case there are no code
>> changes in the user code, only the SDK version.  Is GCP support the only
>> tool when it comes to deciphering the steps added by Dataflow?  I would
>> love to be able to inspect the complete graph with those extra steps like
>> "Unzipped-2/FlattenReplace" that aren't in the job file.
>>
>> Thanks,
>> Evan
>>
>> On Wed, Jul 6, 2022 at 4:21 PM Luke Cwik via user 
>> wrote:
>>
>>> Does doing a pipeline update in 2.36 work or do you want to do an update
>>> to get the latest version?
>>>
>>> Feel free to share the job files with GCP support. It could be something
>>> internal but the coders for ephemeral steps that Dataflow adds are based
>>> upon existing coders within the graph.
>>>
>>> On Tue, Jul 5, 2022 at 8:03 AM Evan Galpin  wrote:
>>>
 +dev@

 Reviving this thread as it has hit me again on Dataflow.  I am trying
 to upgrade an active streaming pipeline from 2.36.0 to 2.40.0.  Originally,
 I received an error that the step "Flatten.pCollections" was missing from
 the new job graph.  I knew from the code that that wasn't true, so I dumped
 the job file via "--dataflowJobFile" for both the running pipeline and for
 the new version I'm attempting to update to.  Both job files showed
 identical data for the Flatten.pCollections step, which raises the question
 of why that would have been reported as missing.

 Out of curiosity I then tried mapping the step to the same name, which
 changed the error to:  "The Coder or type for step
 Flatten.pCollections/Unzipped-2/FlattenReplace has changed."  Again, the
 job files show identical coders for the Flatten step (though
 "Unzipped-2/FlattenReplace" is not present in the job file, maybe an
 internal Dataflow thing?), so I'm confident that the coder hasn't actually
 changed.

 I'm not sure how to proceed in updating the running pipeline, and I'd
 really prefer not to drain.  Any ideas?

 Thanks,
 Evan


 On Fri, Oct 22, 2021 at 3:36 PM Evan Galpin 
 wrote:

> Thanks for the ideas Luke. I checked out the json graphs as per your
> recommendation (thanks for that, was previously unaware), and the
> "output_info" was identical for both the running pipeline and the pipeline
> I was hoping to update it with.  I ended up opting to just drain and 
> submit
> the updated pipeline as a new job.  Thanks for the tips!
>
> Thanks,
> Evan
>
> On Thu, Oct 21, 2021 at 7:02 PM Luke Cwik  wrote:
>
>> I would suggest dumping the JSON representation (with the
>> --dataflowJobFile=/path/to/output.json) of the pipeline before and after
>> and looking to see what is being submitted to Dataflow. Dataflow's JSON
>> graph representation is a bipartite graph where there are transform nodes
>> with inputs and outputs and PCollection nodes with no inputs or outputs.
>> The PCollection nodes typically end with the suffix ".out". This could 
>> help
>> find steps that have been added/removed/renamed.
>>
>> The PipelineDotRenderer[1] might be of use as well.
>>
>> 1:
>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/renderer/PipelineDotRenderer.java
>>
>> On Thu, Oct 21, 2021 at 11:54 AM Evan Galpin 
>> wrote:
>>
>>> Hi all,
>>>
>>> I'm looking for any help regarding updating streaming jobs which are
>>> already running on Dataflow.  Specifically I'm seeking guidance for
>>> situations where Fusion is involved, and trying to decipher which old 
>>> steps
>>> should be mapped to which new steps.
>>>
>>> I have a case where I updated the steps which come after the step in
>>> question, but when I attempt to update there is an error that ">> step>
>>> no longer produces data to the steps ". I believe that
>>>  is only changed as a result of fusion, and in reality it 
>>> does in
>>> fact produce data to  (confirmed when deployed as a new
>>> job for testing purposes).
>>>
>>> Is there a guide for how to deal with updates and fusion?

Re: [Dataflow][Java] Guidance on Transform Mapping Streaming Update

2022-07-08 Thread Luke Cwik via user
I was suggesting GCP support mainly because I don't think you want to share
the 2.36 and 2.40 version of your job file publicly as someone familiar
with the layout and format may spot a meaningful difference.

Also, if it turns out that there is no meaningful difference between the
two then the internal mechanics of how the graph is modified by Dataflow is
not surfaced back to you in enough depth to debug further.



On Fri, Jul 8, 2022 at 6:12 AM Evan Galpin  wrote:

> Thanks for your response Luke :-)
>
> Updating in 2.36.0 works as expected, but as you alluded to I'm attempting
> to update to the latest SDK; in this case there are no code changes in the
> user code, only the SDK version.  Is GCP support the only tool when it
> comes to deciphering the steps added by Dataflow?  I would love to be able
> to inspect the complete graph with those extra steps like
> "Unzipped-2/FlattenReplace" that aren't in the job file.
>
> Thanks,
> Evan
>
> On Wed, Jul 6, 2022 at 4:21 PM Luke Cwik via user 
> wrote:
>
>> Does doing a pipeline update in 2.36 work or do you want to do an update
>> to get the latest version?
>>
>> Feel free to share the job files with GCP support. It could be something
>> internal but the coders for ephemeral steps that Dataflow adds are based
>> upon existing coders within the graph.
>>
>> On Tue, Jul 5, 2022 at 8:03 AM Evan Galpin  wrote:
>>
>>> +dev@
>>>
>>> Reviving this thread as it has hit me again on Dataflow.  I am trying to
>>> upgrade an active streaming pipeline from 2.36.0 to 2.40.0.  Originally, I
>>> received an error that the step "Flatten.pCollections" was missing from the
>>> new job graph.  I knew from the code that that wasn't true, so I dumped the
>>> job file via "--dataflowJobFile" for both the running pipeline and for the
>>> new version I'm attempting to update to.  Both job files showed identical
>>> data for the Flatten.pCollections step, which raises the question of why
>>> that would have been reported as missing.
>>>
>>> Out of curiosity I then tried mapping the step to the same name, which
>>> changed the error to:  "The Coder or type for step
>>> Flatten.pCollections/Unzipped-2/FlattenReplace has changed."  Again, the
>>> job files show identical coders for the Flatten step (though
>>> "Unzipped-2/FlattenReplace" is not present in the job file, maybe an
>>> internal Dataflow thing?), so I'm confident that the coder hasn't actually
>>> changed.
>>>
>>> I'm not sure how to proceed in updating the running pipeline, and I'd
>>> really prefer not to drain.  Any ideas?
>>>
>>> Thanks,
>>> Evan
>>>
>>>
>>> On Fri, Oct 22, 2021 at 3:36 PM Evan Galpin 
>>> wrote:
>>>
 Thanks for the ideas Luke. I checked out the json graphs as per your
 recommendation (thanks for that, was previously unaware), and the
 "output_info" was identical for both the running pipeline and the pipeline
 I was hoping to update it with.  I ended up opting to just drain and submit
 the updated pipeline as a new job.  Thanks for the tips!

 Thanks,
 Evan

 On Thu, Oct 21, 2021 at 7:02 PM Luke Cwik  wrote:

> I would suggest dumping the JSON representation (with the
> --dataflowJobFile=/path/to/output.json) of the pipeline before and after
> and looking to see what is being submitted to Dataflow. Dataflow's JSON
> graph representation is a bipartite graph where there are transform nodes
> with inputs and outputs and PCollection nodes with no inputs or outputs.
> The PCollection nodes typically end with the suffix ".out". This could 
> help
> find steps that have been added/removed/renamed.
>
> The PipelineDotRenderer[1] might be of use as well.
>
> 1:
> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/renderer/PipelineDotRenderer.java
>
> On Thu, Oct 21, 2021 at 11:54 AM Evan Galpin 
> wrote:
>
>> Hi all,
>>
>> I'm looking for any help regarding updating streaming jobs which are
>> already running on Dataflow.  Specifically I'm seeking guidance for
>> situations where Fusion is involved, and trying to decipher which old 
>> steps
>> should be mapped to which new steps.
>>
>> I have a case where I updated the steps which come after the step in
>> question, but when I attempt to update there is an error that "
>> no longer produces data to the steps ". I believe that
>>  is only changed as a result of fusion, and in reality it does 
>> in
>> fact produce data to  (confirmed when deployed as a new
>> job for testing purposes).
>>
>> Is there a guide for how to deal with updates and fusion?
>>
>> Thanks,
>> Evan
>>
>


Re: [Dataflow][Java] Guidance on Transform Mapping Streaming Update

2022-07-08 Thread Evan Galpin
Thanks for your response Luke :-)

Updating in 2.36.0 works as expected, but as you alluded to I'm attempting
to update to the latest SDK; in this case there are no code changes in the
user code, only the SDK version.  Is GCP support the only tool when it
comes to deciphering the steps added by Dataflow?  I would love to be able
to inspect the complete graph with those extra steps like
"Unzipped-2/FlattenReplace" that aren't in the job file.

Thanks,
Evan

On Wed, Jul 6, 2022 at 4:21 PM Luke Cwik via user 
wrote:

> Does doing a pipeline update in 2.36 work or do you want to do an update
> to get the latest version?
>
> Feel free to share the job files with GCP support. It could be something
> internal but the coders for ephemeral steps that Dataflow adds are based
> upon existing coders within the graph.
>
> On Tue, Jul 5, 2022 at 8:03 AM Evan Galpin  wrote:
>
>> +dev@
>>
>> Reviving this thread as it has hit me again on Dataflow.  I am trying to
>> upgrade an active streaming pipeline from 2.36.0 to 2.40.0.  Originally, I
>> received an error that the step "Flatten.pCollections" was missing from the
>> new job graph.  I knew from the code that that wasn't true, so I dumped the
>> job file via "--dataflowJobFile" for both the running pipeline and for the
>> new version I'm attempting to update to.  Both job files showed identical
>> data for the Flatten.pCollections step, which raises the question of why
>> that would have been reported as missing.
>>
>> Out of curiosity I then tried mapping the step to the same name, which
>> changed the error to:  "The Coder or type for step
>> Flatten.pCollections/Unzipped-2/FlattenReplace has changed."  Again, the
>> job files show identical coders for the Flatten step (though
>> "Unzipped-2/FlattenReplace" is not present in the job file, maybe an
>> internal Dataflow thing?), so I'm confident that the coder hasn't actually
>> changed.
>>
>> I'm not sure how to proceed in updating the running pipeline, and I'd
>> really prefer not to drain.  Any ideas?
>>
>> Thanks,
>> Evan
>>
>>
>> On Fri, Oct 22, 2021 at 3:36 PM Evan Galpin 
>> wrote:
>>
>>> Thanks for the ideas Luke. I checked out the json graphs as per your
>>> recommendation (thanks for that, was previously unaware), and the
>>> "output_info" was identical for both the running pipeline and the pipeline
>>> I was hoping to update it with.  I ended up opting to just drain and submit
>>> the updated pipeline as a new job.  Thanks for the tips!
>>>
>>> Thanks,
>>> Evan
>>>
>>> On Thu, Oct 21, 2021 at 7:02 PM Luke Cwik  wrote:
>>>
 I would suggest dumping the JSON representation (with the
 --dataflowJobFile=/path/to/output.json) of the pipeline before and after
 and looking to see what is being submitted to Dataflow. Dataflow's JSON
 graph representation is a bipartite graph where there are transform nodes
 with inputs and outputs and PCollection nodes with no inputs or outputs.
 The PCollection nodes typically end with the suffix ".out". This could help
 find steps that have been added/removed/renamed.

 The PipelineDotRenderer[1] might be of use as well.

 1:
 https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/renderer/PipelineDotRenderer.java

 On Thu, Oct 21, 2021 at 11:54 AM Evan Galpin 
 wrote:

> Hi all,
>
> I'm looking for any help regarding updating streaming jobs which are
> already running on Dataflow.  Specifically I'm seeking guidance for
> situations where Fusion is involved, and trying to decipher which old 
> steps
> should be mapped to which new steps.
>
> I have a case where I updated the steps which come after the step in
> question, but when I attempt to update there is an error that "
> no longer produces data to the steps ". I believe that
>  is only changed as a result of fusion, and in reality it does 
> in
> fact produce data to  (confirmed when deployed as a new
> job for testing purposes).
>
> Is there a guide for how to deal with updates and fusion?
>
> Thanks,
> Evan
>



Re: Any guideline for building golang connector ?

2022-07-08 Thread Yu Watanabe
Hello Danny.

Thank you for the details. I appreciate your message.

I am a newbie around building io . So I will look into the links and first
build my knowledge.

Thanks,
Yu Watanabe

On Fri, Jul 8, 2022 at 8:28 PM Danny McCormick via user <
user@beam.apache.org> wrote:

> Hey Yu,
>
> The guidance on that page should generally apply for Go as well, though we
> are missing an example transform; I filed
> https://github.com/apache/beam/issues/22194 to fix this, but a couple
> examples are our textio implementation
> 
> and this native streaming example
> 
> (non-productionized, but it shows a lot of good concepts for handling
> streaming sources).
>
> Another option would be to use Java's elasticsearch implementation
> 
> with a cross language transform. If you're thinking of contributing this
> back to the main beam code base, I'd probably recommend that approach. In
> general, we're currently investing most heavily in cross language
> transforms because it's a much lower burden to build/support since it
> reuses the main components of the original transform. There are several
> examples of wrapped cross-language transforms in
> https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/io/xlang. If
> you have specific questions about authoring IOs that aren't answered in the
> docs/by example, feel free to drop them in this thread as well!
>
> Thanks,
> Danny
>
> On Fri, Jul 8, 2022 at 3:51 AM Yu Watanabe  wrote:
>
>> Hello .
>>
>> Is there any guideline for building a go sdk connector ?
>> I was reviewing the document but I could not find one for golang.
>>
>> https://beam.apache.org/documentation/io/developing-io-overview/
>>
>> I was thinking of building one for elasticsearch.
>>
>> Thanks,
>> Yu Watanabe
>>
>> --
>> Yu Watanabe
>>
>> linkedin: www.linkedin.com/in/yuwatanabe1/
>> twitter:   twitter.com/yuwtennis
>>
>>
>

-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis


Re: Any guideline for building golang connector ?

2022-07-08 Thread Danny McCormick via user
Hey Yu,

The guidance on that page should generally apply for Go as well, though we
are missing an example transform; I filed
https://github.com/apache/beam/issues/22194 to fix this, but a couple
examples are our textio implementation

and this native streaming example

(non-productionized, but it shows a lot of good concepts for handling
streaming sources).

Another option would be to use Java's elasticsearch implementation

with a cross language transform. If you're thinking of contributing this
back to the main beam code base, I'd probably recommend that approach. In
general, we're currently investing most heavily in cross language
transforms because it's a much lower burden to build/support since it
reuses the main components of the original transform. There are several
examples of wrapped cross-language transforms in
https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/io/xlang. If
you have specific questions about authoring IOs that aren't answered in the
docs/by example, feel free to drop them in this thread as well!

Thanks,
Danny

On Fri, Jul 8, 2022 at 3:51 AM Yu Watanabe  wrote:

> Hello .
>
> Is there any guideline for building a go sdk connector ?
> I was reviewing the document but I could not find one for golang.
>
> https://beam.apache.org/documentation/io/developing-io-overview/
>
> I was thinking of building one for elasticsearch.
>
> Thanks,
> Yu Watanabe
>
> --
> Yu Watanabe
>
> linkedin: www.linkedin.com/in/yuwatanabe1/
> twitter:   twitter.com/yuwtennis
>
>


How to setup staging, pre-prod, and production envs for dataflow jobs?

2022-07-08 Thread Shivam Singhal
Hi Community,

What is the flow you follow for setting up staging, pre-prod and prod
environments for your dataflow jobs?

Stackoverflow question *here
*


Any guideline for building golang connector ?

2022-07-08 Thread Yu Watanabe
Hello .

Is there any guideline for building a go sdk connector ?
I was reviewing the document but I could not find one for golang.

https://beam.apache.org/documentation/io/developing-io-overview/

I was thinking of building one for elasticsearch.

Thanks,
Yu Watanabe

-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis