Hi, 

Your original post looks like "computeThreshold" doesn't require any 
parameters, but is just an expensive to compute operation. 

In that case, you can inherit from "RichFilterFunction" instead of 
"FilterFunction". In case of "RichFilterFunction", you can override the 
"open"-method and perform your operation in there just once and store the 
result e.g. in a transient variable. In that case, nothing gets serialized and 
send over the network. The open method is guaranteed to be called only once per 
operator and is called before the first call to "filter" is made. 

The pattern to pass arguments in general is totally fine. I often pass e.g. a 
connection String as a parameter to my RichFunction and within the open method 
of the function, I establish the connection to some remote system. 

Best regards 
Theo 


Von: "Komal Mariam" <komal.mar...@gmail.com> 
An: "Chesnay Schepler" <ches...@apache.org> 
CC: "user" <user@flink.apache.org> 
Gesendet: Donnerstag, 10. Oktober 2019 04:00:46 
Betreff: Re: Passing parameters to filter function (in DataStreams) 

Thank you @Chesnay! 

I also managed to pass arguments to a RichFilterFunction: new 
MyFilterFunc(Integer threshold ) by defining its constructor. 
If there's a better way to pass arguments I'd appreciate it if you let me know. 

On Tue, 8 Oct 2019 at 19:58, Chesnay Schepler < [ mailto:ches...@apache.org | 
ches...@apache.org ] > wrote: 



You can compute the threshold ahead of time and reference it directly in the 
filter function. 

(Below are 2 examples, depending on whether you like lambdas or not) 
final int threshold = computeThreshold () ; temperatureStream.filter( new 
FilterFunction<Integer>() { @Override public boolean filter (Integer 
temperature) { return temperature > threshold ; }
}) ; 
final int threshold = computeThreshold () ; 
temperatureStream.filter(temperature -> temperature > threshold ) ; 

On 08/10/2019 12:46, Komal Mariam wrote: 

BQ_BEGIN

Hi everyone, 

Suppose I have to compute a filter condition 

Integer threshold = compute threshold(); 

If I: 

temperatureStream.filter(new FilterFunction<temperature>() { 
@Override 
public boolean filter(Integer temperature) throws Exception { 
Integer threshold = compute threshold(); 
return temperature > threshold 
} 

would this mean I have computed threshold over and over again, for every new 
element in the stream? 

my threshold does not changes once it computed. I don't want to recompute it 
every time for new elements? is there way I can pass it as a parameter to the 
filter function? 







BQ_END

Reply via email to