Hi,

Let us know if something doesn’t work :)

Piotrek

> On 16 Mar 2020, at 08:42, Mikael Gordani <mi.gord...@gmail.com> wrote:
> 
> Hi,
> I'll try it out =) 
> 
> Cheers!
> 
> Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski <pi...@ververica.com 
> <mailto:pi...@ververica.com>>:
> Hi,
> 
> In that case you could try to implement your `FilterFunction` as two input 
> operator, with broadcast control input, that would be setting the 
> `global_var`. Broadcast control input can be originating from some source, or 
> from some operator.
> 
> Piotrek
> 
>> On 13 Mar 2020, at 15:47, Mikael Gordani <mi.gord...@gmail.com 
>> <mailto:mi.gord...@gmail.com>> wrote:
>> 
>> Hi Piotr!
>> Thanks for your response, I'll try to explain what I'm trying to achieve in 
>> more detail:
>> 
>> Essentially, If I've two queries, in which has the same operators and runs 
>> in the same task, I would want to figure out some way of controlling the 
>> ingestion from a source to the respective queries in such a way that only 
>> one of the queries receive data, based on a condition. 
>> For more context, the second query (query2), is equipped with instrumented 
>> operators, which are standard operators extended with some extra 
>> functionality, in my case, they enrich the tuples with meta-data.
>> 
>> Source --> Filter1 ---> rest of query1
>>    |
>>    v
>>    Filter2 ---> rest of query2
>> 
>> By using filters prior to the queries, they allow records to pass depending 
>> on a condition, let's say a global boolean variable (which is initially set 
>> to false).
>> If it's set to true, Filter1 will accept every record and Filter2 will 
>> disregard every record.
>> If it's set to false, Filter2 will accept every record and Filter1 will 
>> disregard every record.
>> So the filter operators looks something like this: 
>> boolean global_var = false;
>> 
>> private static class filter1 implements FilterFunction<Tuple t> {
>>     @Override
>>     public boolean filter(Tuple t) throws Exception {
>>         return !global_var;
>>     }
>> }
>> 
>> private static class filter2 implements FilterFunction<Tuple t> {
>>     @Override
>>     public boolean filter(Tuple t) throws Exception {
>>         return global_var;
>>     }
>> }
>> 
>> Then later on, in the respective queries, there are some processing logic in 
>> which changes the value of the global variable, thus enabling and disabling 
>> the flow of data from the source to the respective queries.
>> The problem lies in this global variable being problematic in distributed 
>> deployments, in which I'm having a hard time figuring out how to solve.
>> Is it a bit more clear? =)
> 
> 
> 
> -- 
> Med Vänliga Hälsningar,
> Mikael Gordani

Reply via email to