How to isSet() check for a Timer

2018-11-02 Thread Reza Ardeshir Rokni
Hi,

I have a need to set an alarm in both the Element DoFn as well as the
OnTimer code block. I need to ensure that I do not overwrite a already set
timer, is there a way to check if a Timer has been set?

One thought was to use a ValueState which I can manually set /
unset as part of the operations. Thoughts?

Cheers
Reza


Re: Inferring Csv Schemas

2018-11-30 Thread Reza Ardeshir Rokni
Hi Joe,

You may find some of the info in this blog of interest, its based on
streaming pipelines but useful ideas.

https://cloud.google.com/blog/products/gcp/how-to-handle-mutating-json-schemas-in-a-streaming-pipeline-with-square-enix

Cheers

Reza

On Thu, 29 Nov 2018 at 06:53, Joe Cullen  wrote:

> Hi all,
>
> I have a pipeline reading CSV files, performing some transforms, and
> writing to BigQuery. At the moment I'm reading the BigQuery schema from a
> separate JSON file. If the CSV files had a new column added (and I wanted
> to include this column in the resultant BigQuery table), I'd have to change
> the JSON schema or the pipeline itself. Is there any way to autodetect the
> schema using BigQueryIO? How do people normally deal with potential changes
> to input CSVs?
>
> Thanks,
> Joe
>


Re: Joining bounded and unbounded data not working using non-global window

2018-12-10 Thread Reza Ardeshir Rokni
Hi,

A couple of thoughts;

1- If the amount of data in Hbase that you need to join with is small and
does not change, could you use a Side Input? If it does change you could
try making use of pattern slowly changing lookup cache (ref below).
2- If the amount of data is large, would a direct hbase client call from a
DoFn work to get the data you need to enrich the element? Similar to
pattern Calling external service, (ref below)

Ref :
https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1

Cheers

Reza

On Tue, 11 Dec 2018 at 00:12, Shrijit Pillai 
wrote:

> Hello,
>
> I'm trying to join an unbounded data source and a bounded one using
> CoGroupByKey. The bounded data source is HBase and the unbounded one is
> Kafka.
>
> The co-group works if the global window strategy is used but not with a
> non-global one. I've tried the accumulatingFiredPanes mode(using the
> non-global window) but that didn't help either. Am I missing something to
> make the co-group work using a non-global window like FixedWindows or is
> the GlobalWindow the only way to go about it? I'm using beam 2.8.0
>
> Here's the code snippet:
> https://gist.github.com/shrijitpillai/5e9e642f92dd23b3b7bd60e3ce8056bb
>
> Thanks
> Shrijit
>


OnTimer / OnProcess timing

2018-12-14 Thread Reza Ardeshir Rokni
Hi,

I believe a bug in my timeseries code is because of something I missed in
the sequence of OnTimer / ProcessElement when in stream mode.

If a timer has been set at the lower boundary of a window and elements
arrive in that windows keyed state, which will fire first? The @OnTimer or
@ProcessElement ?

Cheers
Reza


Re: OnTimer / OnProcess timing

2018-12-15 Thread Reza Ardeshir Rokni
Hya Max,

Thank you for the reply and I realized I had not given enough background
context on where the timers are set. So a little more color...

1- Apply Fixed Window of X to stream
2- Combiner generates aggregators per key.
3- Apply Global Window to aggregators. ( It is understood , that order of
aggregators is not guaranteed in the Global Window)
4- A 'looping timer' is created when a key is first seen. If keyed-state is
null a timer is set to the next aggregators lower window boundary.
...
There is actually a lot more to the pipeline than just those steps as it
deals with the out of order problem and other subtle fun bits...

However your answer did give me the logic mistake I had made.. In step 4 I
need to actually set the timer to the upper boundary as I want it to only
fire after watermark has moved to the end of my fixed window. In batch mode
testing this was not an issue as I understand it the timers all fire after
the @Process has been completed. But in stream mode my mistake showed up.
Fun stuff..

I do need to run all the tests with the Flink runner as well, will report
back on how it goes.. :-)

Cheers

Reza

On Fri, 14 Dec 2018 at 21:15, Maximilian Michels  wrote:

> Hi Reza,
>
> You are asking about the order in which @OnTimer and @ProcessElement are
> called.
> I'm not sure about the Dataflow Runner, but in the Flink Runner there is
> no
> strict order between the two, other than the guarantees that apply to
> window
> processing and readiness of timers.
>
> To be able to set the timer in "lower bound" of the window (the minimum
> timestamp), you will have to process an element which registers the timer.
> So
> you can't guarantee the timer fires beforehand. If you meant the "upper
> bound"
> (the maximum timestamp), then it can be guaranteed that the timer fires
> last
> because the timer will fire when the watermark is moved to or past the
> maximum
> timestamp.
>
> Generally, elements will be processed as they arrive in the window. Timers
> are
> fired when they are ready. It is best not to make assumptions based on
> when
> elements arrive which belong to the same window. However, you can be sure
> that
> timers fire after they become eligible.
>
> Thanks,
> Max
>
> On 14.12.18 10:43, Reza Ardeshir Rokni wrote:
> > Hi,
> >
> > I believe a bug in my timeseries code is because of something I missed
> in the
> > sequence of OnTimer / ProcessElement when in stream mode.
> >
> > If a timer has been set at the lower boundary of a window and elements
> arrive in
> > that windows keyed state, which will fire first? The @OnTimer or
> @ProcessElement ?
> >
> > Cheers
> > Reza
>


Re: GroupByKey and number of workers

2019-01-02 Thread Reza Ardeshir Rokni
Hi Mohamed,

I believe this is related to fusion which is a feature of some of the
runners, you will be able to find more information on fusion on:

https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#fusion-optimization

Cheers

Reza

On Thu, 3 Jan 2019 at 04:09, Mohamed Haseeb  wrote:

> Hi,
>
> As per the Authoring I/O Transforms guide
> , the
> recommended way to implement a Read transform (from a source that can be
> read in parallel) has these steps:
> - Splitting the data into parts to be read in parallel (ParDo)
> - Reading from each of those parts (ParDo)
> - With a GroupByKey in between the ParDo:s
> The stated motivation for the GroupByKey is "it allows the runner to use
> different numbers of workers" for the splitting and reading parts. Can
> someone elaborate (or point to some relevant DOCs) on how the GroupByKey
> will enable using different number of works for the two ParDo steps.
>
> Thanks,
> Mohamed
>


Re: Beam Summits!

2019-01-04 Thread Reza Ardeshir Rokni
Hi,

Are there any other folk here based out of Singapore, or APAC in general?

Cheers
Reza

On Fri, 4 Jan 2019 at 04:39, Austin Bennett 
wrote:

> Hi Matthias, etc,
>
> Trying to get thoughts on formalizing a process for getting proposals
> together.  I look forward to the potential day that there are many people
> that want (rather than just willing) to host a summit in a given region in
> a given year.  Perhaps too forward looking.
>
> Also, you mentioned planning London wound up with a tight time window.  If
> shooting for April in SF, seems  the clock might be starting to tick.  Any
> advice for how much time needed?  And guidance on getting whatever formal
> needed through Apache - and does this also necessarily involve a Beam PMC
> or community vote (probably more related to the first paragraph)?
>
> Thanks,
> Austin
>
> On Thu, Dec 20, 2018, 1:09 AM Matthias Baetens  wrote:
>
>> Great stuff, thanks for the overview, Austin.
>>
>> For EU, there are things to say for both Stockholm and Berlin, but I
>> think it makes sense to do it on the back of another conference (larger
>> chance of people being in town with the same interest). I like Thomas
>> comment - we will attract more people from the US if we don't let it
>> conflict with the big events there. +1 for doing it around the time of
>> Berlin Buzzwords.
>>
>> For Asia, I'd imagine Singapore would be an option as well. I'll reach
>> out to some people that are based there to get a grasp on the size of the
>> community there.
>>
>> Best,
>> -M
>>
>>
>>
>> On Thu, 20 Dec 2018 at 05:08, Thomas Weise  wrote:
>>
>>> I think for EU there is a proposal to have it next to Berlin Buzzwords
>>> in June. That would provide better spacing and avoid conflict with
>>> ApacheCon.
>>>
>>> Thomas
>>>
>>>
>>> On Wed, Dec 19, 2018 at 3:09 PM Suneel Marthi 
>>> wrote:
>>>
 How about Beam Summit in Berlin on Sep 6 immediately following Flink
 Forward Berlin on the previous 2 days.

 Same may be for Asia also following Flink Forward Asia where and
 whenever it happens.

 On Wed, Dec 19, 2018 at 6:06 PM Austin Bennett <
 whatwouldausti...@gmail.com> wrote:

> Hi All,
>
> I really enjoyed Beam Summit in London (Thanks Matthias!), and there
> was much enthusiasm for continuations.  We had selected that location in a
> large part due to the growing community there, and we have users in a
> variety of locations.  In our 2019 calendar,
> https://docs.google.com/spreadsheets/d/1CloF63FOKSPM6YIuu8eExjhX6xrIiOp5j4zPbSg3Apo/
> shared in the past weeks, 3 Summits are tentatively slotted for this year.
> Wanting to start running this by the group to get input.
>
> * Beam Summit NA, in San Francisco, approx 3 April 2019 (following
> Flink Forward).  I can organize.
> * Beam Summit Europe, in Stockholm, this was the runner up in voting
> falling behind London.  Or perhaps Berlin?  October-ish 2019
> * Beam Summit Asia, in Tokyo ??
>
> What are general thoughts on locations/dates?
>
> Looking forward to convening in person soon.
>
> Cheers,
> Austin
>



Re: Keeping keys in a state for a very very long time (keys expiry unknown)

2020-04-06 Thread Reza Ardeshir Rokni
Are you able to make use of the following pattern?

Store StateA-metadata until no activity for Duration X, you can use a Timer
to check this, then expire the value, but store in an external system. If
you get a record that does want this value after expiry, call out to the
external system and store the value again in key StateA-metadata.

Cheers

On Tue, 7 Apr 2020 at 08:03, Mohil Khare  wrote:

> Hello all,
> We are attempting a implement a use case where beam (java sdk) reads two
> kind of records from data stream like Kafka:
>
> 1. Records of type A containing key and corresponding metadata.
> 2. Records of type B containing the same key, but no metadata. Beam then
> needs to fill metadata for records of type B  by doing a lookup for
> metadata using keys received in records of type A.
>
> Idea is to save metadata or rather state for keys received in records of
> type A and then do a lookup when records of type B are received.
> I have implemented this using the "@State" construct of beam. However my
> problem is that we don't know when keys should expire. I don't think
> keeping a global window will be a good idea as there could be many keys
> (may be millions over a period of time) to be saved in a state.
>
> What is the best way to achieve this? I was reading about RedisIO, but
> found that it is still in the experimental stage. Can someone please
> recommend any solution to achieve this.
>
> Thanks and regards
> Mohil
>
>
>
>
>
>


Re: Keeping keys in a state for a very very long time (keys expiry unknown)

2020-04-06 Thread Reza Ardeshir Rokni
Depends on the use case, Global state comes with the technical debt of
having to do your own GC, but comes with more control. You could
implement the pattern above with a long FixedWindow as well, which will
take care of the GC within the window  bound.

Sorry, its not a yes / no answer :-)

On Tue, 7 Apr 2020 at 09:03, Mohil Khare  wrote:

> Thanks a lot Reza for your quick response. Yeah saving the data in an
> external system after timer expiry makes sense.
> So do you suggest using a global window for maintaining state ?
>
> Thanks and regards
> Mohil
>
> On Mon, Apr 6, 2020 at 5:37 PM Reza Ardeshir Rokni 
> wrote:
>
>> Are you able to make use of the following pattern?
>>
>> Store StateA-metadata until no activity for Duration X, you can use a
>> Timer to check this, then expire the value, but store in an
>> external system. If you get a record that does want this value after
>> expiry, call out to the external system and store the value again in key
>> StateA-metadata.
>>
>> Cheers
>>
>> On Tue, 7 Apr 2020 at 08:03, Mohil Khare  wrote:
>>
>>> Hello all,
>>> We are attempting a implement a use case where beam (java sdk) reads two
>>> kind of records from data stream like Kafka:
>>>
>>> 1. Records of type A containing key and corresponding metadata.
>>> 2. Records of type B containing the same key, but no metadata. Beam then
>>> needs to fill metadata for records of type B  by doing a lookup for
>>> metadata using keys received in records of type A.
>>>
>>> Idea is to save metadata or rather state for keys received in records of
>>> type A and then do a lookup when records of type B are received.
>>> I have implemented this using the "@State" construct of beam. However my
>>> problem is that we don't know when keys should expire. I don't think
>>> keeping a global window will be a good idea as there could be many keys
>>> (may be millions over a period of time) to be saved in a state.
>>>
>>> What is the best way to achieve this? I was reading about RedisIO, but
>>> found that it is still in the experimental stage. Can someone please
>>> recommend any solution to achieve this.
>>>
>>> Thanks and regards
>>> Mohil
>>>
>>>
>>>
>>>
>>>
>>>


Re: Keeping keys in a state for a very very long time (keys expiry unknown)

2020-04-06 Thread Reza Ardeshir Rokni
Great! BTW if you get the time and wanted to contribute back to beam there
is a nice section to record cool patterns:

https://beam.apache.org/documentation/patterns/overview/

This would make a great one!

On Tue, 7 Apr 2020 at 09:12, Mohil Khare  wrote:

> No ... that's a valid answer. Since I wanted to have a long window size
> per key and since we can't use state with session windows, I am using a
> sliding window for let's say 72 hrs which starts every hour.
>
> Thanks a lot Reza for your input.
>
> Regards
> Mohil
>
> On Mon, Apr 6, 2020 at 6:09 PM Reza Ardeshir Rokni 
> wrote:
>
>> Depends on the use case, Global state comes with the technical debt of
>> having to do your own GC, but comes with more control. You could
>> implement the pattern above with a long FixedWindow as well, which will
>> take care of the GC within the window  bound.
>>
>> Sorry, its not a yes / no answer :-)
>>
>> On Tue, 7 Apr 2020 at 09:03, Mohil Khare  wrote:
>>
>>> Thanks a lot Reza for your quick response. Yeah saving the data in an
>>> external system after timer expiry makes sense.
>>> So do you suggest using a global window for maintaining state ?
>>>
>>> Thanks and regards
>>> Mohil
>>>
>>> On Mon, Apr 6, 2020 at 5:37 PM Reza Ardeshir Rokni 
>>> wrote:
>>>
>>>> Are you able to make use of the following pattern?
>>>>
>>>> Store StateA-metadata until no activity for Duration X, you can use a
>>>> Timer to check this, then expire the value, but store in an
>>>> external system. If you get a record that does want this value after
>>>> expiry, call out to the external system and store the value again in key
>>>> StateA-metadata.
>>>>
>>>> Cheers
>>>>
>>>> On Tue, 7 Apr 2020 at 08:03, Mohil Khare  wrote:
>>>>
>>>>> Hello all,
>>>>> We are attempting a implement a use case where beam (java sdk) reads
>>>>> two kind of records from data stream like Kafka:
>>>>>
>>>>> 1. Records of type A containing key and corresponding metadata.
>>>>> 2. Records of type B containing the same key, but no metadata. Beam
>>>>> then needs to fill metadata for records of type B  by doing a lookup for
>>>>> metadata using keys received in records of type A.
>>>>>
>>>>> Idea is to save metadata or rather state for keys received in records
>>>>> of type A and then do a lookup when records of type B are received.
>>>>> I have implemented this using the "@State" construct of beam. However
>>>>> my problem is that we don't know when keys should expire. I don't think
>>>>> keeping a global window will be a good idea as there could be many keys
>>>>> (may be millions over a period of time) to be saved in a state.
>>>>>
>>>>> What is the best way to achieve this? I was reading about RedisIO, but
>>>>> found that it is still in the experimental stage. Can someone please
>>>>> recommend any solution to achieve this.
>>>>>
>>>>> Thanks and regards
>>>>> Mohil
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>


Re: Non-trivial joins examples

2020-05-03 Thread Reza Ardeshir Rokni
A couple of things that are really nice here,

1- Domain specific (CTR in your example). We may find that eventually it's
not possible / practical to build out generic joins for all situations. But
with the primitives available in Beam and good 'patterns' domain specific
joins could be added for different industries.

2- Pros / Cons section. This is very nice and as Kenn mentioned it would be
great for there to be a Collection of joins that users can choose from
based on the pros / cons.

I got pulled onto other work before I could complete this PR (LINK
) for example, but I hope to go
back to it, it's specific to a time series use case from a specific
industry with pros and cons based on throughput etc

Maybe we should consider adding something with links etc to Beam
patterns...

https://beam.apache.org/documentation/patterns/overview/

Perhaps a Joins section and we do something that has not been done before
and add a Industry / Domain flavour..

Cheers

Reza

On Sat, 2 May 2020 at 14:45, Marcin Kuthan  wrote:

> @Kenneth - thank for your response, for sure I was inspired a lot by
> earlier discussions on the group and latest documentation updates about
> Timers:
> https://beam.apache.org/documentation/programming-guide/#timers
>
> In the limitations I forgot to mention about SideInputs, it works quite
> well for scenarios where one side of the join is updated slowly, very
> slowly. But for scenarios where the main stream gets 50k+ events per
> seconds and the joined stream ~100 events per second it simply does not
> work. Especially if there is no support for updates in Map side input and
> the side input has to be updated/broadcasted as a whole.
>
> @Jan - very interesting, as I understood the joins are already implemented
> (plenty of them in Scio, classic ones, sparse versions, etc.) the problem
> is with limited windows semantics, triggering policy and the time of
> emitted events.
>
> Please look at LookupCacheDoFn, it looks like left outer join - but it
> isn't. Only the latest Lookup value (right side of the join) is cached. And
> the left side of the join is cached only until the first matching lookup is
> observed. Not so generic but quite efficient.
>
>
> https://github.com/mkuthan/beam-examples/blob/master/src/main/scala/org/mkuthan/beam/examples/LookupCacheDoFn.scala
>
> Marcin
>
> On Fri, 1 May 2020 at 22:22, Jan Lukavský  wrote:
>
>> Interestingly, I'm currently also working on a proposal for generic join
>> semantics. I plan to send a proposal for review, but unfortunately, there
>> are still other things keeping me busy. I take this opportunity to review
>> high-level thoughts, maybe someone can give some points.
>>
>> The general idea is to define a join that can incorporate all other types
>> as special cases, where the generic implementation can be simplified or
>> optimized, but the semantics remain the same. As I plan to put this down to
>> a full design document I will just very roughly outline ideas:
>>
>>  a) the generic semantics, should be equivalent to running relational
>> join against set of tables _after each individual modification of the
>> relation_ and producing results with timestamp of the last modification
>>
>>  b) windows "scope" state of each "table" - i.e. when time reaches
>> window.maxTimestamp() corresponding "table" is cleared
>>
>>  c) it should be possible to derive other types of joins from this
>> definition by certain manipulations (e.g. buffering multiple updates in
>> single window and assigninig all elements timestamp of
>> window.maxTimestamp() will yield the classical "windowed join" with the
>> requirement to have same windows on both (all) sides as otherwise the
>> result will be empty) - the goal of these modification is typically
>> enabling some optimization (e.g. the fully generic implementation must
>> include time sorting - either implicitly or explicitly, optimized variants
>> can drop this requirement).
>>
>> It would be great is someone has any comments on this bottom-up approach.
>>
>> Jan
>> On 5/1/20 5:30 PM, Kenneth Knowles wrote:
>>
>> +dev @beam and some people who I talk about joins
>> with
>>
>> Interesting! It is a lot to take in and fully grok the code, so calling
>> in reinforcements...
>>
>> Generally, I think there's agreement that for a lot of real use cases,
>> you have to roll your own join using the lower level Beam primitives. So I
>> think it would be great to get some of these other approaches to joins into
>> Beam, perhaps as an extension of the Java SDK or even in the core (since
>> schema joins are in the core). In particular:
>>
>>  - "join in fixed window with repeater" sounds similar (but not
>> identical) to work by Mikhail
>>  - "join in global window with cache" sounds similar (but not identical)
>> to work and discussions w/ Reza and Tyson
>>
>> I want to be clear that I am *not* saying there's any duplication. I'm
>> guessing these all fit into a collection of di

Re: PubSub latency for Beam pipeline on Flink runner

2020-05-12 Thread Reza Ardeshir Rokni
Hi Vincent,

Did you mean <=3000 or did you want that to be <=3?

Cheers
Reza

On Fri, 8 May 2020 at 04:23, Vincent Domingues <
vincent.doming...@dailymotion.com> wrote:

> Hi all,
>
> We are trying to work with Beam on Flink runner to consume PubSub messages.
>
> We are facing latency issue even with very low PubSub throughput.
>
> For example if you try the following simple beam pipeline consuming a
> PubSub subscription :
>
>
> ---
>
> package org.apache.beam.examples;
>
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
> import org.apache.beam.sdk.metrics.Counter;
> import org.apache.beam.sdk.metrics.Distribution;
> import org.apache.beam.sdk.metrics.Metrics;
> import org.apache.beam.sdk.options.Description;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.joda.time.Instant;
>
> import java.util.logging.Logger;
>
> public class PubSubBeam {
>
> private final static Logger LOGGER =
> Logger.getLogger(PubSubBeam.class.getName());
>
> public interface MyOptions extends PipelineOptions {
> @Description("Topic to use in PubSub")
> String getInputSubscription();
> void setInputSubscription(String value);
> }
>
> static class LogMessages extends DoFn {
> private final Distribution distribution =
> Metrics.distribution(this.getClass().getName(), "latency-distribution");
> private final Counter counter_30 =
> Metrics.counter(this.getClass().getName(), "0s_30s");
> private final Counter counter_60 =
> Metrics.counter(this.getClass().getName(), "30s_60s");
> private final Counter counter_90 =
> Metrics.counter(this.getClass().getName(), "60s_90s");
> private final Counter counter_120 =
> Metrics.counter(this.getClass().getName(), "90s_120s");
> private final Counter counter_240 =
> Metrics.counter(this.getClass().getName(), "120s_240s");
> private final Counter counter_inf =
> Metrics.counter(this.getClass().getName(), "240s_infs");
> private final Counter total =
> Metrics.counter(this.getClass().getName(), "total");
>
> @ProcessElement
> public void processElement(ProcessContext c) {
> Long latency = Instant.now().getMillis() -
> c.timestamp().getMillis();
> if (latency <= 3000){
> counter_30.inc();
> }
> else if (latency <= 6){
> counter_60.inc();
> }
> else if (latency <= 9){
> counter_90.inc();
> }
> else if (latency <= 12){
> counter_120.inc();
> }
> else if (latency <= 24){
> counter_240.inc();
> }
> else if (latency > 24){
> counter_inf.inc();
> }
> total.inc();
> distribution.update(latency);
> }
> }
>
> public static void main(String[] args) {
> MyOptions options =
> PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
> Pipeline p = Pipeline.create(options);
>
> p.apply("Read PubSub Messages",
> PubsubIO.readStrings().fromSubscription(options.getInputSubscription()))
>  .apply(ParDo.of(new LogMessages()));
>
> p.run();
> }
> }
>
>
> ---
>
> If you recover pipeline metrics you'll get this latency distribution:
>
> 0s_30s: 31.81%
> 30s_60s: 64.89%
> 60s_90s: 2.73%
> 90s_120s: 0.44%
> 120s_240s: 0.13%
> 240s_infs: 0.01%
> total: 100.00%
>
>
> With almost no operations on our pipeline 64% of our messages need between
> 30 to 60 seconds to get acknowledged.
>
> Someone already faced this situation or it is a known issue ?
> I am interested on any clue to deal with this latency issue.
>
> Thanks for your help
> Stay safe
>
> Regards,
> Vincent
>


Re: Re: Join daily update Bigquery table with pubsub topic message

2020-05-23 Thread Reza Ardeshir Rokni
If things fit in memory please have a look at the following pattern:

https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-global-window-side-inputs

Note there is a nicer API coming for this pattern,

https://issues.apache.org/jira/browse/BEAM-9650




On Sun, 24 May 2020 at 11:25, 杨胜  wrote:

>
> How large is the BigQuery table? Does it fit in memory?
>
> 10 columns (each column data is small), 800,000 rows of data. I believe
> these data should be easily fitted into the memory.
>
>
> At 2020-05-24 11:13:04, "Reuven Lax"  wrote:
>
> How large is the BigQuery table? Does it fit in memory?
>
> On Sat, May 23, 2020 at 7:01 PM 杨胜  wrote:
>
>> Hi everyone,
>>
>> I am new to apache beam, but I had experiences on spark streaming.
>>
>> I have a daily updated bigquery table, I want to use this bigquery table
>> as a lookup table, read this table into beam  as  bounded
>> PCollection and refresh this collection within beam on daily
>> basis, I named this variable *bigqueryTableRows*. I also had another
>> pubsub topic messages, I want to read this message as unbounded
>> PCollection, I named this variable as *pubsubTableRows*. then
>> join *bigqueryTableRows* with *pubsubTableRows*. finally write result
>> into bigquery.
>>
>> I have checked all the examples under beam's github repository:
>> https://github.com/apache/beam/tree/d906270f243bb4de20a7f0baf514667590c8c494/examples/java/src/main/java/org/apache/beam/examples.
>> But none matches my case.
>>
>> Any suggestion on how I should implement my pipeline?
>>
>> Many Thanks,
>> Steven
>>
>>
>>
>>
>
>
>
>


Re: Limiting elements on streaming pipelines

2020-11-03 Thread Reza Ardeshir Rokni
Hi,

You may want to use more than one element in your Create to start the
FlatMap process as with a runner that does Fusion
,
the code will end up only being able to parallelize to 1. So make use of a
Create with say O(10's) elements and have each one of those then do a
partition of the for loop work.

Cheers
Reza

On Wed, 4 Nov 2020 at 08:57, André Rocha Silva <
a.si...@portaltelemedicina.com.br> wrote:

> Fellow users
>
> I am not very used to making streaming pipelines, but I have a batch to
> write to pub/sub.
>
> My pipeline starts with a 'fake' element only to trigger the next step.
> Then in a FlatMap I use a For that yields many elements inside a for. But
> in the last step I've got only 100 elements coming in.
> Should I work with windowing or something like that?
> my_pipeline = (
> p
> | 'Creating pipeline' >> beam.Create(['1'])
> | 'Get things' >> beam.FlatMap(GetThings)
> | 'Post on Pub/Sub' >> beam.io.WriteToPubSub(topic
> =user_options.topic.get())
> )
>
> I am working on python. Apache beam 2.17, Python 3.7
>
> Thank you for helping me!
>
> --
>
>*ANDRÉ ROCHA SILVA*
>   * DATA ENGINEER*
>   (48) 3181-0611
>
>    /andre-rocha-silva/
> 
> 
> 
> 
>
>


Re: Session window ad sideinput

2020-12-22 Thread Reza Ardeshir Rokni
Hi,

Why the need for session windows? Could you make use of a Global Window for
the side input, as per the following pattern:

https://beam.apache.org/documentation/patterns/side-inputs/

Cheers
Reza



On Tue, 22 Dec 2020 at 01:17, Manninger, Matyas 
wrote:

> Dear Beam users,
>
> I am writing a streaming pipeline that has static tables as side inputs.
> These tables change from time to time and I want the side inputs to be
> updated at some intervals. I am planning on triggering the update by
> sending a message through pubsub. When a new message arrives, the side
> input should be updated. I would like to do this with session windows but
> on the beam documentation page session windows are depicted as lasting from
> the first input in the window to the last input in the window and the gap
> seems to not belong to any window. So if I would use this as a side
> input how would my main stream be matched to windows? If I send a signal
> every day and the gap is set to 1 hour, for example, would the window close
> after 1 hour and for the next 23 hours all the elements in the main
> stream would be matched to no side input?
>
> Thanks for any help or tips on how to solve this or what is the expected
> behaviour.
>
> BR,
> Matyas Manninger
>


Re: Session window ad sideinput

2021-01-04 Thread Reza Ardeshir Rokni
If you can make use of Java SDK, you can use GenerateSequence. With the
python SDK, you should be able to use GenerateSequence via xlang.

Do you have a Jira for the bugs with the PeriodicImpulse?

Cheers

Reza

On Mon, 4 Jan 2021 at 20:12, Manninger, Matyas 
wrote:

> Dear Reza,
>
> Thanks for the suggestion, that is the solution I was going for but
> unfortunately PeriodicImpulse has some bugs. I also posted a question about
> that in this mail list but no success there so far so I am looking for
> alternatives.
>
> On Tue, 22 Dec 2020 at 12:36, Reza Ardeshir Rokni 
> wrote:
>
>> Hi,
>>
>> Why the need for session windows? Could you make use of a Global Window
>> for the side input, as per the following pattern:
>>
>> https://beam.apache.org/documentation/patterns/side-inputs/
>>
>> Cheers
>> Reza
>>
>>
>>
>> On Tue, 22 Dec 2020 at 01:17, Manninger, Matyas <
>> matyas.mannin...@veolia.com> wrote:
>>
>>> Dear Beam users,
>>>
>>> I am writing a streaming pipeline that has static tables as side inputs.
>>> These tables change from time to time and I want the side inputs to be
>>> updated at some intervals. I am planning on triggering the update by
>>> sending a message through pubsub. When a new message arrives, the side
>>> input should be updated. I would like to do this with session windows but
>>> on the beam documentation page session windows are depicted as lasting from
>>> the first input in the window to the last input in the window and the gap
>>> seems to not belong to any window. So if I would use this as a side
>>> input how would my main stream be matched to windows? If I send a signal
>>> every day and the gap is set to 1 hour, for example, would the window close
>>> after 1 hour and for the next 23 hours all the elements in the main
>>> stream would be matched to no side input?
>>>
>>> Thanks for any help or tips on how to solve this or what is the expected
>>> behaviour.
>>>
>>> BR,
>>> Matyas Manninger
>>>
>>


Re: Looping timer, global windows, and direct runner

2021-01-12 Thread Reza Ardeshir Rokni
Are you making use of TestStream for your Unit test?

On Wed, 13 Jan 2021 at 00:27, Raman Gupta  wrote:

> Your reply made me realize I removed the condition from my local copy of
> the looping timer that brings the timer forward if it encounters an earlier
> element later in the stream:
>
> if (currentTimerValue == null || currentTimerValue >
> nextTimerTimeBasedOnCurrentElement.getMillis()) {
>
>
> Restoring that condition fixes that issue.
>
> However, the reason I removed that condition in the first place was
> because it was making a unit test non-deterministic -- sometimes the
> element timestamps into the looping timer didn't seem to match the element
> timestamps according to the EARLIEST timestamp combiner defined, causing
> the timer to execute an additional time.
>
> The pipeline:
>
> input
>   // withAllowedTimestampSkew is deprecated, but as of now, there is no 
> replacement
>   // https://issues.apache.org/jira/browse/BEAM-644
>   .apply("XTimestamps", WithTimestamps
> .of { it.enteredAt.asJoda() }
> .withAllowedTimestampSkew(Duration.INFINITE.asJoda())
>   )
>   .apply("FixedTickWindows",
> Window.into(FixedWindows.of(5.minutes.asJoda()))
>   .triggering(
> AfterWatermark.pastEndOfWindow()
>   .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
>   .withLateFirings(AfterPane.elementCountAtLeast(1))
>   )
>   .withAllowedLateness(3.days.asJoda(), 
> Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
>   .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS)
>   .discardingFiredPanes()
>   .withTimestampCombiner(TimestampCombiner.EARLIEST)
>   )
>   .apply("KeyByUser", WithKeys.of { it.userId })
>   .apply("GroupByUser", GroupByKey.create())
>   .apply("GlobalWindowsLoopingStatefulTimer",
> Window.into>>(GlobalWindows())
>   
> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
>   .discardingFiredPanes()
>   .withTimestampCombiner(TimestampCombiner.EARLIEST)
>   )
>   .apply("LoopingStatefulTimer",
> ParDo.of(LoopingStatefulTimer(5.minutes, (options.timerTimeoutDays ?: 
> 30).days)))
>
>
> The looping timer receives an @Timestamp value in the process function of:
>
> 1970-01-01T07:34:59.999Z
>
> but the earliest timestamp of the (single) element in the elements
> iterable is:
>
> 1970-01-01T07:30:00.000Z
>
> I would have thought given my timestamp combiners on my windows that the
> timestamp should have been 07:30:00.000Z. Is there something wrong in my
> pipeline that is causing this non-deterministic behavior?
>
> Thanks,
> Raman
>
> On Tue, Jan 12, 2021 at 9:47 AM Jan Lukavský  wrote:
>
>> Hi Raman,
>>
>> can you share the details of the pipeline? How exactly are you using the
>> looping timer? Timer as described in the linked blog post should be
>> deterministic even when the order of the input elements is undefined.
>> Does you logic depend on element ordering?
>>
>>   Jan
>>
>> On 1/12/21 3:18 PM, Raman Gupta wrote:
>> > Hello, I am building and testing a pipeline with the direct runner.
>> > The pipeline includes a looping timer -
>> > https://beam.apache.org/blog/looping-timers/.
>> >
>> > For now, I am using JdbcIO to obtain my input data, though when put
>> > into production the pipeline will use PubSubIO.
>> >
>> > I am finding that the looping timer begins producing outputs at a
>> > random event time, which makes some sense given the randomization of
>> > inputs in the direct runner. However, this makes the results of
>> > executing my pipeline with the direct runner completely
>> non-deterministic.
>> >
>> > So:
>> >
>> > 1) Is there a way to turn off this non-deterministic behavior, but
>> > just for the GlobalWindow / LoopingTimer?
>> >
>> > 2) Perhaps alternatively, is there a way to "initialize" the looping
>> > timer when the pipeline starts, rather than when it first sees an
>> > element? Perhaps a side input?
>> >
>> > 3) Am I right in assuming that when I move this pipeline to pub/sub io
>> > and operate it in streaming mode, this issue will go away?
>> >
>> > Thanks!
>> > Raman
>> >
>>
>


Re: Looping timer, global windows, and direct runner

2021-01-13 Thread Reza Ardeshir Rokni
Hi,

Late data, in general, can be problematic for the looping timer pattern, as
well as other use cases, where the arrival of data in the @process function
creates a EventTime domain Timer. The solution you have, which
essentially passes it through, is a nice option, another solution would be
to deal with the late data in a branch upstream of the DoFn where timer
work is happening ( via a multi output). The former is good , if your
downstream transforms are not expecting specific behaviours from the
looping timer characteristics. The latter is often easier to use when you
want to write your downstream transforms without having to think about
dealing with late data.

Cheers

Reza

On Wed, 13 Jan 2021 at 23:12, Jan Lukavský  wrote:

> I think there still could be problems in some corner cases. The problem
> is, that elements considered 'late' in timestamp combiner have different
> definition than what is marked as late in PaneInfo. So you can have a
> corner case, when PaneInfo would on ON_TIME, but the timestamp would still
> be shifted to end of window. This would probably not be too often, but it
> can happen. If it is fine for your use case, then this could work.
>
> Jan
> On 1/13/21 3:59 PM, Raman Gupta wrote:
>
> Hmm, I think I've found a simple solution... adding this to the beginning
> of my looping timer @ProcessElement function:
>
> // late elements don't need to affect our looping timer,// pass them through 
> without modification// this is kind of a work-around for 
> https://issues.apache.org/jira/browse/BEAM-2262// but I think makes sense in 
> general for looping timers when// there is no need to trigger timers after 
> the window is doneif (paneInfo.timing == PaneInfo.Timing.LATE) {
>   receiver.output(element)
>   return}
>
> At least all my unit tests are passing... is there any problem with this
> approach?
>
> Thanks,
> Raman
>
>
> On Wed, Jan 13, 2021 at 9:42 AM Raman Gupta  wrote:
>
>> (Replying to Reza) Yes, I am using TestStream for my unit test. Other
>> replies below.
>>
>> On Wed, Jan 13, 2021 at 3:40 AM Jan Lukavský  wrote:
>>
>>> Hi,
>>>
>>> yes, there is a possible non-determinism, that is related to the
>>> timestamp combiner. Timestamp combiners combine only elements, that are not
>>> 'late' ([1]), meaning that their timestamp is not preceding output
>>> watermark of the GBK. Looking at the pipeline code I suppose that could be
>>> the cause.
>>>
>>
>> Yes, the test stream in this test case does indeed send the element in
>> question "late". Here is the setup:
>>
>> val base = Instant.EPOCH + 6.hours
>> val xStream: TestStream = TestStream.create(coder)
>>   .addElements(x["1"]) // this just initializes the looping timer
>>   // advance watermark past end of window that would normally process x2
>>   .advanceWatermarkTo((base + 3.hours + 1.minutes).asJoda())
>>   .addElements(x["2"]) // now we see the element
>>   .advanceWatermarkToInfinity()
>>
>> Here late element x["2"] has a timestamp of 1970-01-01T07:30:00.000Z and
>> the watermark at the time x["2"] is added is 1970-01-01T09:00:01.000Z.
>>
>> So I get your point that the timestamp combiner is not used for late
>> elements, but if late elements are singly emitted as in this pipeline, why
>> do any timestamp modification at all? I would expect them to arrive with
>> their original timestamp, not one changed from 1970-01-01T07:30:00.000Z
>> to 1970-01-01T07:34:59.999Z (this is the part that seems
>> non-deterministic). What is the logic / reason behind the pipeline setting
>> this element's timestamp to 1970-01-01T07:34:59.999Z?
>>
>>
>>> You can make the pipeline deterministic by using
>>> TimestampCombiner.END_OF_WINDOW (default).
>>>
>>
>> It's definitely not ideal for this use case, but I'll consider it.
>>
>>
>>> If you *need* to use the TimestampCombiner.EARLIEST, you can probably do
>>> that by tweaking the looping timer stateful dofn and fix timestamps there
>>> (using timer output timestamp).
>>>
>>
>> I had already tried that but the pipeline throws an error that the
>> timestamp emitted cannot be earlier than the current element timestamp.
>>
>> Thanks,
>> Raman
>>
>>
>>
>>>   Jan
>>>
>>> [1] https://issues.apache.org/jira/browse/BEAM-2262
>>> On 1/12/21 5:26 PM, Raman Gupta wrote:
>>>
>>> Your reply made me realize I removed the condition from my local copy of
>>> the looping timer that brings the timer forward if it encounters an earlier
>>> element later in the stream:
>>>
>>> if (currentTimerValue == null || currentTimerValue >
>>> nextTimerTimeBasedOnCurrentElement.getMillis()) {
>>>
>>>
>>> Restoring that condition fixes that issue.
>>>
>>> However, the reason I removed that condition in the first place was
>>> because it was making a unit test non-deterministic -- sometimes the
>>> element timestamps into the looping timer didn't seem to match the element
>>> timestamps according to the EARLIEST timestamp combiner defined, causing
>>> the timer to execute an additional time.
>>>
>>> The pipeline:
>>>

Re: clear State using business logic

2022-12-09 Thread Reza Ardeshir Rokni
Have you explored processing time timers?

https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/state/TimeDomain.html#PROCESSING_TIME

On Wed, 23 Nov 2022 at 13:46, Sigalit Eliazov  wrote:

> Hi all,
>
> the flow in our pipeline is:
>
> 1. read event X from kafka. open fixed window of 30 sec.
> 2. read event subscription from kafka. open GlobalWindow and store a
> state of all subscriptions.
> 3. match X and Y using key and if there is a match send an event to
> another kafka topic. (we use the state as side input)
>
> if a user unsubscribes (meaning we read from a different source an
> event of unsubscribe)  we would like to delete the relevant entry from
> the state.
> Can this be achieved only using state and not using some external cache/db?
>
> I am aware there is an option to add timers on state but the
> expiration logic is not time based.
> Any suggestions?
>
> Thanks in advance
> Sigalit
>


Re: [Question] state scope to only key

2023-01-26 Thread Reza Ardeshir Rokni
Hi,

For these types of use cases, folks will generally make use of the Global
Window which is -/+ inf and Timers. Some key considerations when using the
Global Window:

1- GC is not done by the system as the window will never close.
2- There are no order guarantees, so you will often need to make use of looping
timer patterns.

Cheers

Reza

On Thu, 26 Jan 2023 at 00:09, Vignesh Kumar Kathiresan via user <
user@beam.apache.org> wrote:

> Hi Community,
>
> I am new to beam coming from flink. In flink state can be scoped to only
> key. A datasteam(similar to Pcollections in beam) can be converted to a
> keyed data stream. And a process function on this keyed stream can access
> state scoped to only key. It also has state scoped to key+window. In beam
> though I see only state scoped to key+window combination. Is my
> understanding correct? How are use cases using state scoped to key without
> windows solved in beam.
>
> thanks
> Vignesh
>


Re: [E] Re: [Question] state scope to only key

2023-01-26 Thread Reza Ardeshir Rokni
So it sounds like the timestamp of the event is not important here? If that
is correct then order is not important and you can do something like this (
not tried it out so I might have missed detail)..

Use a CombiningState to keep a count of elements that are being passed
through.
On a new element check the value, if its < x then output the element if > x
then set an EventTimer for time now+something and add the element to a
bagstate
In the timercode clear the bag state and reset the combinestate


On Thu, 26 Jan 2023 at 12:08, Vignesh Kumar Kathiresan <
vkath...@yahooinc.com> wrote:

> Thanks Reza. Does this mean having a global window and triggering for
> every new event? The use case I was mentioning was a throttle kind of
> application using the key based state. Events flow though and the keyed
> state is increased by 1 but once a particular key reaches a certain max
> count, the newer events are skipped until the state is cleared after the
> throttle period. I am looking at something similar to a stateful keyed
> parDo so that all events of same key go to the same worker (assuming state
> is local to worker as in flink)
>
> On Thu, Jan 26, 2023 at 8:13 AM Reza Ardeshir Rokni 
> wrote:
>
>> Hi,
>>
>> For these types of use cases, folks will generally make use of the Global
>> Window which is -/+ inf and Timers. Some key considerations when using the
>> Global Window:
>>
>> 1- GC is not done by the system as the window will never close.
>> 2- There are no order guarantees, so you will often need to make use of 
>> looping
>> timer
>> <https://urldefense.com/v3/__https://beam.apache.org/blog/looping-timers/__;!!Op6eflyXZCqGR5I!H02HPpgqudbKt18Xo0IC7a0LSrWD3znlzIBATW1EMItF4iCSrhnLM-ziCgsZd14hFgKlQEKBi4GQULrH$>
>> patterns.
>>
>> Cheers
>>
>> Reza
>>
>> On Thu, 26 Jan 2023 at 00:09, Vignesh Kumar Kathiresan via user <
>> user@beam.apache.org> wrote:
>>
>>> Hi Community,
>>>
>>> I am new to beam coming from flink. In flink state can be scoped to only
>>> key. A datasteam(similar to Pcollections in beam) can be converted to a
>>> keyed data stream. And a process function on this keyed stream can access
>>> state scoped to only key. It also has state scoped to key+window. In beam
>>> though I see only state scoped to key+window combination. Is my
>>> understanding correct? How are use cases using state scoped to key without
>>> windows solved in beam.
>>>
>>> thanks
>>> Vignesh
>>>
>>


Re: [E] Re: [Question] state scope to only key

2023-01-26 Thread Reza Ardeshir Rokni
PS for the elements that flow through when < x you will need to add a data
driven trigger to after the global window.

On Thu, 26 Jan 2023 at 20:11, Reza Ardeshir Rokni  wrote:

> So it sounds like the timestamp of the event is not important here? If
> that is correct then order is not important and you can do something like
> this ( not tried it out so I might have missed detail)..
>
> Use a CombiningState to keep a count of elements that are being passed
> through.
> On a new element check the value, if its < x then output the element if >
> x then set an EventTimer for time now+something and add the element to a
> bagstate
> In the timercode clear the bag state and reset the combinestate
>
>
> On Thu, 26 Jan 2023 at 12:08, Vignesh Kumar Kathiresan <
> vkath...@yahooinc.com> wrote:
>
>> Thanks Reza. Does this mean having a global window and triggering for
>> every new event? The use case I was mentioning was a throttle kind of
>> application using the key based state. Events flow though and the keyed
>> state is increased by 1 but once a particular key reaches a certain max
>> count, the newer events are skipped until the state is cleared after the
>> throttle period. I am looking at something similar to a stateful keyed
>> parDo so that all events of same key go to the same worker (assuming state
>> is local to worker as in flink)
>>
>> On Thu, Jan 26, 2023 at 8:13 AM Reza Ardeshir Rokni 
>> wrote:
>>
>>> Hi,
>>>
>>> For these types of use cases, folks will generally make use of the
>>> Global Window which is -/+ inf and Timers. Some key considerations when
>>> using the Global Window:
>>>
>>> 1- GC is not done by the system as the window will never close.
>>> 2- There are no order guarantees, so you will often need to make use of 
>>> looping
>>> timer
>>> <https://urldefense.com/v3/__https://beam.apache.org/blog/looping-timers/__;!!Op6eflyXZCqGR5I!H02HPpgqudbKt18Xo0IC7a0LSrWD3znlzIBATW1EMItF4iCSrhnLM-ziCgsZd14hFgKlQEKBi4GQULrH$>
>>> patterns.
>>>
>>> Cheers
>>>
>>> Reza
>>>
>>> On Thu, 26 Jan 2023 at 00:09, Vignesh Kumar Kathiresan via user <
>>> user@beam.apache.org> wrote:
>>>
>>>> Hi Community,
>>>>
>>>> I am new to beam coming from flink. In flink state can be scoped to
>>>> only key. A datasteam(similar to Pcollections in beam) can be converted to
>>>> a keyed data stream. And a process function on this keyed stream can access
>>>> state scoped to only key. It also has state scoped to key+window. In beam
>>>> though I see only state scoped to key+window combination. Is my
>>>> understanding correct? How are use cases using state scoped to key without
>>>> windows solved in beam.
>>>>
>>>> thanks
>>>> Vignesh
>>>>
>>>