Hi,

Yes, you are looking in the right directions with the watermarks. 

First of all you would have to use event time semantic for constant results. 
With processing time everything would be simpler, but it would be more 
difficult to reason about the results (your choice). Secondly, you would have 
to hook up the logic of enabling query1/query2 to the event time/watermarks. 
Thirdly, you need to somehow to sync the input switching with the windows 
boundaries. On top of that, watermarks express lower bound of even time that 
you can expect. However, in order to guarantee consistency of the windows, you 
would like to control the upper bound. For example:

1. If you want to enable Query2, you would need to check what’s the 
largest/latest event time that was processed by the input splitter, lets say 
that’s TS1 
2. That means, records with event time < TS1 have already been processed by 
Query1, starting some windows
3. The earliest point for which you could enable Query2, is thus TS1 + 1.
4. You would have to adjust Query2 start time, by start of the next time 
window, let’s say that would be TS2 = TS1 + 1 + start of next window
5. Input splitter now must keep sending records with event time < TS2 to 
Query1, but already should redirect records with event time >= TS2 to Query2.
6. Once watermark for the input splitter advances past TS2, that’s when it can 
finally stop sending records to Query1 and query1 logic could be considered 
“completed”.  

So Query1 would be responsible for all of the data before TS2, and Query2 after 
TS2.

Alternatively, your input splitter could also buffer some records, so that you 
could enable Query2 faster, by re-sending the buffered records. But in that 
case, both Query1 and Query2 would be responsible for some portion of the data.

Piotrek

> On 17 Mar 2020, at 10:35, Mikael Gordani <mi.gord...@gmail.com> wrote:
> 
> Hi Piotr!
> 
> Continuing with my scenario, since both of the queries will share the same 
> sink, I've realized that some issues will appear when I switch queries. 
> Especially with regards to stateful operators, e.g aggregation.
> 
> Let me provide an example:
> So, let say that both of the queries ingest a sequence of integers, and it 
> will perform the average of these integers over some time.
> E.g say that query1 ingest the sequence 1,2,3,4.... 
> The windows for query1 will be [1,2,3] [2,3,4] [3,4]. 
> 
> If I'm later on "activating" query2, I need to have both of the queries 
> allowing tuples for a while, in order to allow the aggregation to finish in 
> query1 before denying it input.
> But, there is a possibility that query2 might receive the tuples 3,4, which 
> will result in the window: [3][3,4][3,4]
> Later on, the output of the respective queries will be:
> Query 1: 3, 4.5, 3.5
> Query2 : 3, 3.5, 3.5
> 
> As one can see, the second output will be different. 
> I'm thinking of using watermarks somehow to make sure that both queries has 
> processed the same amount of data before writing to the sink, but I'm a bit 
> unsure on how to do it.
> Do you have any suggestions or thoughts?
> Cheers,
> 
> Den mån 16 mars 2020 kl 08:43 skrev Piotr Nowojski <pi...@ververica.com 
> <mailto:pi...@ververica.com>>:
> Hi,
> 
> Let us know if something doesn’t work :)
> 
> Piotrek
> 
>> On 16 Mar 2020, at 08:42, Mikael Gordani <mi.gord...@gmail.com 
>> <mailto:mi.gord...@gmail.com>> wrote:
>> 
>> Hi,
>> I'll try it out =) 
>> 
>> Cheers!
>> 
>> Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski <pi...@ververica.com 
>> <mailto:pi...@ververica.com>>:
>> Hi,
>> 
>> In that case you could try to implement your `FilterFunction` as two input 
>> operator, with broadcast control input, that would be setting the 
>> `global_var`. Broadcast control input can be originating from some source, 
>> or from some operator.
>> 
>> Piotrek
>> 
>>> On 13 Mar 2020, at 15:47, Mikael Gordani <mi.gord...@gmail.com 
>>> <mailto:mi.gord...@gmail.com>> wrote:
>>> 
>>> Hi Piotr!
>>> Thanks for your response, I'll try to explain what I'm trying to achieve in 
>>> more detail:
>>> 
>>> Essentially, If I've two queries, in which has the same operators and runs 
>>> in the same task, I would want to figure out some way of controlling the 
>>> ingestion from a source to the respective queries in such a way that only 
>>> one of the queries receive data, based on a condition. 
>>> For more context, the second query (query2), is equipped with instrumented 
>>> operators, which are standard operators extended with some extra 
>>> functionality, in my case, they enrich the tuples with meta-data.
>>> 
>>> Source --> Filter1 ---> rest of query1
>>>    |
>>>    v
>>>    Filter2 ---> rest of query2
>>> 
>>> By using filters prior to the queries, they allow records to pass depending 
>>> on a condition, let's say a global boolean variable (which is initially set 
>>> to false).
>>> If it's set to true, Filter1 will accept every record and Filter2 will 
>>> disregard every record.
>>> If it's set to false, Filter2 will accept every record and Filter1 will 
>>> disregard every record.
>>> So the filter operators looks something like this: 
>>> boolean global_var = false;
>>> 
>>> private static class filter1 implements FilterFunction<Tuple t> {
>>>     @Override
>>>     public boolean filter(Tuple t) throws Exception {
>>>         return !global_var;
>>>     }
>>> }
>>> 
>>> private static class filter2 implements FilterFunction<Tuple t> {
>>>     @Override
>>>     public boolean filter(Tuple t) throws Exception {
>>>         return global_var;
>>>     }
>>> }
>>> 
>>> Then later on, in the respective queries, there are some processing logic 
>>> in which changes the value of the global variable, thus enabling and 
>>> disabling the flow of data from the source to the respective queries.
>>> The problem lies in this global variable being problematic in distributed 
>>> deployments, in which I'm having a hard time figuring out how to solve.
>>> Is it a bit more clear? =)
>> 
>> 
>> 
>> -- 
>> Med Vänliga Hälsningar,
>> Mikael Gordani
> 
> 
> 
> -- 
> Med Vänliga Hälsningar,
> Mikael Gordani

Reply via email to