Hi Fabian,
My dataset is of this type
RegionType (Long, String, Long, Long, Char, Array[GValue])
Where GValue is a case class implemented by
GString(v:String)
GDouble(v:Double)
I have two case of sorting:
In the first (topk) i have to group by the first field of the regions and sort
by a set of fields of the GValue array
In the second (topg) i have to sort by the first field of the regions and by a
set of fields of the array, then sort by one field of the array
For grouping i am using the groupby function with a function as parameter that
creates the hash of the desired fields, something like
ds.groupby((r:RegionType) =>
s = new stringBuilder
s.append(r._1)
grouping.init.foreach((index:int) =>
s.append("#")
s.append(r._6(index))
)
Md5.hash(s.toString)
)
Then i sort it using (in the topg case, the second)
.sortGroup(((r:RegionType)=>
r._6(grouping.last ) /*here i am doing some cast, i am writing from my
smartphone i don't remember all the details sorry*/ ),Order.ASCENDING)
in the first case instead i group only by r._1 and i have a recursive function
that appends sortgroup operator to the grouoed dataset
Is there a way to solve this?
I think i don't understand what a keySelector is
Thanks!
Michele
________________________________
Da: Fabian Hueske <[email protected]>
Inviato: martedì 16 giugno 2015 23.43.03
A: [email protected]
Oggetto: Re: sorting groups
Hi,
the error is related to the way you specify the grouping and the sorting key.
The API is currently restricted in the way, that you can only use a key
selector function for the sorting key if you also used a selector function for
the grouping key.
In Scala the use of key selector functions is often not very obvious.
If you post the groupBy().sortGroup() command and the input type, I can help
you getting it right.
Cheers, Fabian
2015-06-16 23:37 GMT+02:00 Michele Bertoni
<[email protected]<mailto:[email protected]>>:
Hi everybody,
I am trying to sorting a grouped dataset, but i am getting this error:
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException:
Sorting on KeySelector keys only works with KeySelector grouping.
at
org.apache.flink.api.scala.GroupedDataSet.sortGroup(GroupedDataSet.scala:113)
at
it.polimi.genomics.flink.FlinkImplementation.regionOperation.OrderRD$.sort(OrderRD.scala:82)
...
can anybody help me understanding the error?
i have no idea what it means and google is not helpful in this case
thanks!
cheers
Michele