Hi Timur,

You can use a composite key [1] to compare keys consisting of multiple fields. 
For example:

```
val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
a.coGroup(b)
  .where(“f1”, “f2”) // Flink compares the values of f1 first, and compares the 
values of f2 if values of f1 are same.
  .equalTo(“f1”, “f2”) { // Note that you must specify same number of keys 
    (left, right) => 1
  }
```

Composite key can be applied to Scala tuple also:

```
val a = env.fromCollection(Seq(("a", "b"), ("c", "d")))
val b = env.fromCollection(Seq(("a", "x"), ("z", "m")))
a.coGroup(b)
  .where(0, 1) // Note that field numbers start from 0.
  .equalTo(0, 1) {
    (left, right) => 1
  }
```

I hope this helps.

[1]: 
https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples

Regards,
Chiwan Park

> On Mar 30, 2016, at 3:54 AM, Timur Fayruzov <timur.fairu...@gmail.com> wrote:
> 
> Hello,
> 
> Another issue I have encountered is incorrect implicit resolution (I'm using 
> Scala 2.11.7). Here's the example (with a workaround):
> val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> a.coGroup(b)
>   .where(e => e.f1)
>   //.equalTo(e => e) { //this fails to compile because equalTo expects an 
> implicit
>   .equalTo("f1") {
>     (left, right) => 1
>   }
> However, the workaround does not quite work when key is a tuple (I suspect 
> this applies to other generic classes as well):
> val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> a.coGroup(b)
>   .where(e => (e.f1, e.f2))
>   .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String, String)])) 
> { (left, right) => 1} // throws InvalidProgramException
> Here, I try to provide the implicit TypeInformation explicitly, but 
> apparently it's not compatible with the way implicit inference is done. 
> (TypeInformation I generate is GenericType<scala.Tuple2>, while 
> scala.Tuple2<String, String> is expected).
> 
> Now, I can split this in 2 operations like below:  
> val tmp = a.coGroup(b)
>   .where(e => (e.f1, e.f2))
>   .equalTo(e => (e.f1, e.f2))
> 
> tmp { (left, right) => 1}
> but, I would like to avoid adding clutter to my processing logic, and it's 
> not entirely clear to me how this would be scheduled.
> 
> As an option, I can hash the hell out of my keys like that:
> a.coGroup(b)
>   .where(e => (e.f1, e.f2).hashCode)
>   .equalTo(e => (e.f1, e.f2).hashCode)(TypeInformation.of(classOf[Int])){ 
> (left, right) => 1}
> but that, again, adds some indirection and clutter, not mentioning the hassle 
> of dealing with collisions (which can be alleviated by using fancy hashes, 
> but I'd like to avoid that).
> 
> Any insights on what is the way to go here are highly appreciated.
> 
> Thanks,
> Timur

Reply via email to