hi Julian, that is ok, actually, I solved based on https://edgent.apache.org/recipes/recipe_adaptable_filter_range.html I am using mqtt to get the parameters and also publishing the values on another topic
https://github.com/felipegutierrez/explore-rpi/blob/master/src/main/java/org/sense/edgent/app/AdaptableFilterRangeApp.java And then I am using flink to process the values already filtered: https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/AdaptiveFilterRangeMqttEdgent.java My filter is not on the flink program but on the edgent program. Kind Regards, Felipe *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>* On Fri, Dec 21, 2018 at 2:51 PM Julian Feinauer < j.feina...@pragmaticminds.de> wrote: > Hi Felipe, > > sorry for responding so late. > Can you give a bit more detail of what exactly you are trying to achieve? > > As I understand it, you want to send some query parameters to your edgent > program which should adopt dynamically to them. > Do I get this right? > > Julian > > Am 20.12.18, 17:17 schrieb "Felipe Gutierrez" < > felipe.o.gutier...@gmail.com>: > > Hi, turned out that I could do it with Apache Flink using > CoFlatMapFunction > ( > > https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.html > ). > This operator receives two streams which one of them I can send my > parameters and change the filter on the other stream. > > Is there a similar operator on Apache Edgent? > > Thanks, > Felipe > *--* > *-- Felipe Gutierrez* > > *-- skype: felipe.o.gutierrez* > *--* *https://felipeogutierrez.blogspot.com > <https://felipeogutierrez.blogspot.com>* > > > On Wed, Dec 19, 2018 at 5:41 PM Felipe Gutierrez < > felipe.o.gutier...@gmail.com> wrote: > > > Hi, > > > > Let me explain why I want to do that, so you can also see if it is > > reasonable or not. I want to do local processing on the Data source > node > > (that is what Edgent is for, am I right?). Nevertheless, I want to > execute > > my query using a data flow Engine (Flink) on the cluster [1]. So, > instead > > of filter my data on the cluster I aim to do it early with Apache > Edgent > > [2]. > > > > If you look the code you will see that I am publishing data with MQTT > > using Edgent and consuming with MQTT connector on my Flink program. > So, > > suppose I want to execute a simple filter on the flink program. > However, I > > know that I can do this on Edgent. I just want to pass some > parameters to > > my edgent program and the Stream accepts it at runtime. Is it > possible? > > > > ps: If not, I am also willing to receive some advice to implement it > on > > the source code. > > > > [1] > > > https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/SensorsReadingMqttEdgentQEP.java > > [2] > > > https://github.com/felipegutierrez/explore-rpi/blob/master/src/main/java/org/sense/edgent/app/TempSensorMqttApp.java > > > > Kind Regards, > > Felipe > > *--* > > *-- Felipe Gutierrez* > > > > *-- skype: felipe.o.gutierrez* > > *--* *https://felipeogutierrez.blogspot.com > > <https://felipeogutierrez.blogspot.com>* > > > > >