Hi Konstantin, and don't worry for your response. Your gist was perfect. Some days after my email I modify some things and I almost got it. Besides, as some other people recommended me, I have added a WindowStream because I am not looking for a global maximum, the value could increase or decrease.
I am currently working on the KeySelector, but I think that it is not going to work, I have this: //anotherDataStream is the union of two datastreams with different or maybe similar matchName. I want to compare them and get the key. DataStream<Informacion> example=anotherDatastream.keyBy(new KeySelector<Informacion, String>() { public String getKey(Informacion info) { //The algorithm that emits a score based on similarity SimilarityStrategy strategy3 = new DiceCoefficientStrategy(); StringSimilarityService service = new StringSimilarityServiceImpl(strategy3); double score = service.score(info.matchName,* info2.matchName*); // Score is 0.90... if(score<0.75) { return info.nombrePartido; } else{ return null; } } }); As you can see, I need two strings to compare between each other and I think is not possible to do it with the KeySelector. Thanks for everything, you have helped me a lot. El viernes, 10 de junio de 2016, Konstantin Knauf < konstantin.kn...@tngtech.com> escribió: > Hi again, > > and again sorry for the late response. > > Regarding your first question: You can use a Key Selector Function [1]. > > Regarding your second question: If I understand your requirement > correctly, this is already happening in my gist. > > By taking the union of both streams the local and away max are taken > over both streams. The coFlatMap holds the currentAwayMax and > currentAwayLocal and evaluates your condition if either of the maximum > values changes. > > Does this help? > > Cheers, > > Konstantin > > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/common/index.html#specifying-keys > > On 02.06.2016 23:20, iñaki williams wrote: > > Hi again! Thanks for your tips and gists, those are being really > > helpful. However, I probably didn't express my idea properly and it has > > been a litle misunderstood. I have been thinking about how to do this > > during these days and I will try to put a concrete example of what I > > want and if my way is the correct one. I have made a new diagram with > > "real" examples. > > > > > > First question, as you can notice, "Match name" is quite similar but it > > is not always the same. Name from DataStream 1 could be "Rafa Nadal" > > while in the 2nd DataStream the name of the match could be "R. Nadal", > > so is there any way to rewrite the .keyby() method in order to use a > > library that compares Strings and match it according to similarities and > > not because of the exact name?. > > > > Second question, In the case that I could key those tennis matches, when > > I am doing the CoFlatMap and having these two matches for example: > > > > DataStream1 <"Rafa Nadal - Roger Federer", 1.90, 2.10> > > DataStream2 <"Rafa Nadal - Roger Federer", 2.20, 1.80> > > > > I would like to take the biggest values from both fields, in this case > > it should be: 2.20 and 2.10, being the final result as: > > <"Rafa Nadal - Roger Federer", 2.20, 2.10>. > > > > I don't know if I am mistaken but, I think I could use the valueState to > > save those values and compare it? > > > > Thanks for your time! :) > > Very Grateful. > > > > > > 2016-05-29 17:32 GMT+02:00 Konstantin Knauf > > <konstantin.kn...@tngtech.com <mailto:konstantin.kn...@tngtech.com>>: > > > > Hi again, > > > > from your diagram I have put together a gist, which I think does the > > job. I haven't had the time to test it though :( > > > > https://gist.github.com/knaufk/d1312503b99ee51554a70c9a22abe7e5 > > > > If you have any questions, let me know. It sometimes just takes a > while > > until I answer ;) > > > > Cheers, > > > > Konstantin > > > > On 28.05.2016 13:49, iñaki williams wrote: > > > Hello again! :) > > > > > > I have been checking the solution that you proprosed and but I > don't > > > really get how the KeyValueState helps on it. Could you please > explain > > > it a little bmore? > > > > > > I have drawn a diagram to make what I want clear, notice that the > middle > > > table doesn't need to be a table, it is just what I want and I > don't > > > have enough knowledge on Flink to know how to do it. > > > > > > > > > Thanks for your time! > > > > > > > > > > > > 2016-05-26 20:33 GMT+02:00 Konstantin Knauf > > > <konstantin.kn...@tngtech.com > > <mailto:konstantin.kn...@tngtech.com> > > <mailto:konstantin.kn...@tngtech.com > > <mailto:konstantin.kn...@tngtech.com>>>: > > > > > > Hi, > > > > > > interesting use case, you are looking for sure bets, I guess ;) > > > > > > Well, I think, what you want to then is probably to use a > > > ConnectedStream, which you keyBy the "name" of both streams. > > > > > > The you can use CoFlatMap for comparison. You can use a > KeyValueState zu > > > save prices. In each map you can then check if you have a > price for this > > > name already saved from the other stream and if not save the > price. The > > > challenge will be to clean up state. > > > > > > Let me know, if this works out. > > > > > > Cheers, > > > > > > Konstantin > > > > > > On 26.05.2016 20 <tel:26.05.2016%2020> > <tel:26.05.2016%2020>:01, iñaki > > williams wrote: > > > > Hi! > > > > > > > > I will explain it with more details: > > > > > > > > I am comparing real time sport odds from two different > betting Webpages. > > > > > > > > Assuming that I get just one java object (in reality I > should get a List > > > > of in-play matches), for each DataStream and assuming that > the name is > > > > the same of course, what I want to do is compare both price > attributes > > > > in "real time", I am only interested on the currently price, > not the > > > > previous one. Example: > > > > > > > > What is the price for the Event 1 from website "X" RIGHT > NOW? > > > > > > > > JavaObjectX.price > > > > > > > > What is the price for the Event 1 from website "Y" RIGHT > NOW? > > > > > > > > JavaObjectY.price > > > > > > > > > > > > Compare both attributes > > > > Get a result depending on that comparison > > > > > > > > My java object doesn't have a timestamp, but I think I > should use it right? > > > > > > > > > > > > Thanks! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2016-05-26 19:48 GMT+02:00 Konstantin Knauf > > > > <konstantin.kn...@tngtech.com <mailto: > konstantin.kn...@tngtech.com> > > > <mailto:konstantin.kn...@tngtech.com <mailto: > konstantin.kn...@tngtech.com>> > > > <mailto:konstantin.kn...@tngtech.com > > <mailto:konstantin.kn...@tngtech.com> > > > <mailto:konstantin.kn...@tngtech.com <mailto: > konstantin.kn...@tngtech.com>>>>: > > > > > > > > Hi, > > > > > > > > let me first check, if I understand your requirements > correctly. I > > > > assume you want to compare attribute price for objects > with the same > > > > name only, right? > > > > > > > > Further, I assume the objects are some kind of offer/bid > with a > > > > timestamp? > > > > > > > > I think the solution heavily depends on how the records, > which should be > > > > compared relate in time. So basically, if an object > arrives from one > > > > source, which time window of objects from the other > stream should be > > > > considered for comparison? > > > > > > > > Cheers, > > > > > > > > Konstantin > > > > > > > > On 26.05.2016 18 <tel:26.05.2016%2018> > > <tel:26.05.2016%2018> <tel:26.05.2016%2018>:55, iñaki > > > williams wrote: > > > > > Hi! > > > > > > > > > > I am working on something quite similar to the > stockPrice example that > > > > > is posted on the webpage > > > > > ( > https://flink.apache.org/news/2015/02/09/streaming-example.html) > > > > > > > > > > I am extracting some data from 2 different webpages > and I > > > > represent the > > > > > result using a java object. The diagram could be > something like this: > > > > > > > > > > DataStream1--------JavaObject(name, price) --\ > > > > > > \ > > > > > > [ > > > > > how to compare result?] > > > > > > / > > > > > DataStream2--------JavaObject(name, price) --/ > > > > > > > > > > > > > > > What I want to do is to get the attribute price from > both data objects > > > > > and compare it between each other / make some math > operations. For > > > > > example, if the first JavaObject.price is bigger than > the second > > > > > JavaObject.price, then show a message. > > > > > > > > > > > > > > > Which is the (best) way of doing this? I am new using > Flink and I am > > > > > quite lost :) > > > > > > > > > > > > > > > Thanks! > > > > > > > > -- > > > > Konstantin Knauf * konstantin.kn...@tngtech.com <mailto: > konstantin.kn...@tngtech.com> > > <mailto:konstantin.kn...@tngtech.com > > <mailto:konstantin.kn...@tngtech.com>> > > > > <mailto:konstantin.kn...@tngtech.com > > <mailto:konstantin.kn...@tngtech.com> > > > <mailto:konstantin.kn...@tngtech.com > > <mailto:konstantin.kn...@tngtech.com>>> * +49-174-3413182 > > <tel:%2B49-174-3413182> > > > <tel:%2B49-174-3413182> > > > > <tel:%2B49-174-3413182> > > > > TNG Technology Consulting GmbH, Betastr. 13a, 85774 > > Unterföhring > > > > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. > Robert > > > Dahlke > > > > Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > > > > > > > > > > > > > -- > > > Konstantin Knauf * konstantin.kn...@tngtech.com > > <mailto:konstantin.kn...@tngtech.com> > > > <mailto:konstantin.kn...@tngtech.com > > <mailto:konstantin.kn...@tngtech.com>> * +49-174-3413182 > > <tel:%2B49-174-3413182> > > > <tel:%2B49-174-3413182> > > > TNG Technology Consulting GmbH, Betastr. 13a, 85774 > Unterföhring > > > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert > > Dahlke > > > Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > > > > > > > > > -- > > Konstantin Knauf * konstantin.kn...@tngtech.com > > <mailto:konstantin.kn...@tngtech.com> * +49-174-3413182 > > <tel:%2B49-174-3413182> > > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > > Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > > > > > -- > Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > Sitz: Unterföhring * Amtsgericht München * HRB 135082 >