Sure,
here some pseudo code:
public class CentroidMover extends GroupReduceFunction<Point, Centroid> {
public void reduce(Iterable<Point> points, Collector<Centroid> out) {
int cnt = 0;
Centroid sum = new Centroid(0,0);
for(Point p : points) {
sum = sum + p // (your addition logic goes here)
cnt++;
}
out.collect(sum / cnt); // your division logic goes here.
}
This function computes the sum and the count of a group and the final
average.
Is this what you are looking for?
2015-05-26 11:34 GMT+02:00 Pa Rö <[email protected]>:
> thanks for your message,
>
> maybe you can give me a exsample for the GroupReduceFunction?
>
> 2015-05-22 23:29 GMT+02:00 Fabian Hueske <[email protected]>:
>
>> There are two ways to do that:
>>
>> 1) You use a GroupReduceFunction, which gives you an iterator over all
>> points similar to Hadoop's ReduceFunction.
>> 2) You use the ReduceFunction to compute the sum and the count at the
>> same time (e.g., in two fields of a Tuple2) and use a MapFunction to do the
>> final division.
>>
>> I'd go with the first choice. It's easier.
>>
>> Best, Fabian
>>
>> 2015-05-22 23:09 GMT+02:00 Paul Röwer <[email protected]>:
>>
>>> good evening,
>>>
>>> sorry, my english is not the best.
>>>
>>> by comupte the new centroid, i will sum all points of the cluster and
>>> form the new center.
>>> in my other implementation firstly i sum all point and at the end i
>>> divides by number of points.
>>> to example: (1+2+3+4)/4=2,5
>>>
>>> at flink i reduce always two point to one,
>>> for the example upstairs: (1+2)/2=1,5 --> (1,5+3)/2=2,25 -->
>>> (2,25+4)=3,125
>>>
>>> how can i rewrite my function so, that it work like my other
>>> implementation?
>>>
>>> best regards,
>>> paul
>>>
>>>
>>>
>>> Am 22.05.2015 um 16:52 schrieb Stephan Ewen:
>>>
>>> Sorry, I don't understand the question.
>>>
>>> Can you describe a bit better what you mean with "how i can sum all
>>> points and share thoug the counter" ?
>>>
>>> Thanks!
>>>
>>> On Fri, May 22, 2015 at 2:06 PM, Pa Rö <[email protected]>
>>> wrote:
>>>
>>>> i have fix a bug at the input reading, but the results are still
>>>> different.
>>>>
>>>> i think i have local the problem, in the other implementation i sum all
>>>> geo points/time points and share thougt the counter.
>>>> but in flink i sum two points and share thougt two, and sum the next...
>>>>
>>>> the method is the following:
>>>>
>>>> // sums and counts point coordinates
>>>> private static final class CentroidAccumulator implements
>>>> ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {
>>>>
>>>> private static final long serialVersionUID =
>>>> -4868797820391121771L;
>>>>
>>>> public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer,
>>>> GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
>>>> return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0,
>>>> addAndDiv(val1.f0,val1.f1,val2.f1));
>>>> }
>>>> }
>>>>
>>>> private static GeoTimeDataTupel addAndDiv(int
>>>> clusterid,GeoTimeDataTupel input1, GeoTimeDataTupel input2){
>>>> long time = (input1.getTime()+input2.getTime())/2;
>>>> List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
>>>> list.add(input1.getGeo());
>>>> list.add(input2.getGeo());
>>>> LatLongSeriable geo = Geometry.getGeoCenterOf(list);
>>>>
>>>> return new GeoTimeDataTupel(geo,time,"POINT");
>>>> }
>>>>
>>>> how i can sum all points and share thoug the counter?
>>>>
>>>>
>>>> 2015-05-22 9:53 GMT+02:00 Pa Rö <[email protected]>:
>>>>
>>>>> hi,
>>>>> if i print the centroids all are show in the output. i have implement
>>>>> k means with map reduce und spark. by same input, i get the same output.
>>>>> but in flink i get a one cluster output with this input set. (i use csv
>>>>> files from the GDELT projekt)
>>>>>
>>>>> here my class:
>>>>>
>>>>> public class FlinkMain {
>>>>>
>>>>>
>>>>> public static void main(String[] args) {
>>>>> //load properties
>>>>> Properties pro = new Properties();
>>>>> try {
>>>>> pro.load(new
>>>>> FileInputStream("./resources/config.properties"));
>>>>> } catch (Exception e) {
>>>>> e.printStackTrace();
>>>>> }
>>>>> int maxIteration =
>>>>> 1;//Integer.parseInt(pro.getProperty("maxiterations"));
>>>>> String outputPath = pro.getProperty("flink.output");
>>>>> // set up execution environment
>>>>> ExecutionEnvironment env =
>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>> // get input points
>>>>> DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
>>>>> DataSet<GeoTimeDataCenter> centroids = getCentroidDataSet(env);
>>>>> // set number of bulk iterations for KMeans algorithm
>>>>> IterativeDataSet<GeoTimeDataCenter> loop =
>>>>> centroids.iterate(maxIteration);
>>>>> DataSet<GeoTimeDataCenter> newCentroids = points
>>>>> // compute closest centroid for each point
>>>>> .map(new SelectNearestCenter()).withBroadcastSet(loop,
>>>>> "centroids")
>>>>> // count and sum point coordinates for each centroid
>>>>> .groupBy(0).reduce(new CentroidAccumulator())
>>>>> // compute new centroids from point counts and coordinate
>>>>> sums
>>>>> .map(new CentroidAverager());
>>>>> // feed new centroids back into next iteration
>>>>> DataSet<GeoTimeDataCenter> finalCentroids =
>>>>> loop.closeWith(newCentroids);
>>>>> DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints =
>>>>> points
>>>>> // assign points to final clusters
>>>>> .map(new
>>>>> SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
>>>>> // emit result
>>>>> clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
>>>>> finalCentroids.writeAsText(outputPath+"/centers");//print();
>>>>> // execute program
>>>>> try {
>>>>> env.execute("KMeans Flink");
>>>>> } catch (Exception e) {
>>>>> e.printStackTrace();
>>>>> }
>>>>> }
>>>>>
>>>>> private static final class SelectNearestCenter extends
>>>>> RichMapFunction<GeoTimeDataTupel,Tuple2<Integer,GeoTimeDataTupel>> {
>>>>>
>>>>> private static final long serialVersionUID =
>>>>> -2729445046389350264L;
>>>>> private Collection<GeoTimeDataCenter> centroids;
>>>>>
>>>>> @Override
>>>>> public void open(Configuration parameters) throws Exception {
>>>>> this.centroids =
>>>>> getRuntimeContext().getBroadcastVariable("centroids");
>>>>> }
>>>>>
>>>>> @Override
>>>>> public Tuple2<Integer, GeoTimeDataTupel> map(GeoTimeDataTupel
>>>>> point) throws Exception {
>>>>> double minDistance = Double.MAX_VALUE;
>>>>> int closestCentroidId= -1;
>>>>>
>>>>> // check all cluster centers
>>>>> for(GeoTimeDataCenter centroid : centroids) {
>>>>> // compute distance
>>>>> double distance = Distance.ComputeDist(point,
>>>>> centroid);
>>>>> // update nearest cluster if necessary
>>>>> if(distance < minDistance) {
>>>>> minDistance = distance;
>>>>> closestCentroidId = centroid.getId();
>>>>> }
>>>>> }
>>>>> // emit a new record with the center id and the data point
>>>>> return new Tuple2<Integer,
>>>>> GeoTimeDataTupel>(closestCentroidId, point);
>>>>> }
>>>>> }
>>>>>
>>>>> // sums and counts point coordinates
>>>>> private static final class CentroidAccumulator implements
>>>>> ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {
>>>>>
>>>>> private static final long serialVersionUID =
>>>>> -4868797820391121771L;
>>>>>
>>>>> public Tuple2<Integer, GeoTimeDataTupel>
>>>>> reduce(Tuple2<Integer, GeoTimeDataTupel> val1, Tuple2<Integer,
>>>>> GeoTimeDataTupel> val2) {
>>>>> return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0,
>>>>> addAndDiv(val1.f1,val2.f1));
>>>>> }
>>>>> }
>>>>>
>>>>> private static GeoTimeDataTupel addAndDiv(GeoTimeDataTupel input1,
>>>>> GeoTimeDataTupel input2){
>>>>> long time = (input1.getTime()+input2.getTime())/2;
>>>>> List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
>>>>> list.add(input1.getGeo());
>>>>> list.add(input2.getGeo());
>>>>> LatLongSeriable geo = Geometry.getGeoCenterOf(list);
>>>>>
>>>>> return new GeoTimeDataTupel(geo,time,"POINT");
>>>>> }
>>>>>
>>>>> // computes new centroid from coordinate sum and count of points
>>>>> private static final class CentroidAverager implements
>>>>> MapFunction<Tuple2<Integer, GeoTimeDataTupel>, GeoTimeDataCenter> {
>>>>>
>>>>> private static final long serialVersionUID =
>>>>> -2687234478847261803L;
>>>>>
>>>>> public GeoTimeDataCenter map(Tuple2<Integer, GeoTimeDataTupel>
>>>>> value) {
>>>>> return new GeoTimeDataCenter(value.f0,
>>>>> value.f1.getGeo(),value.f1.getTime());
>>>>> }
>>>>> }
>>>>>
>>>>> private static DataSet<GeoTimeDataTupel>
>>>>> getPointDataSet(ExecutionEnvironment env) {
>>>>> // load properties
>>>>> Properties pro = new Properties();
>>>>> try {
>>>>> pro.load(new
>>>>> FileInputStream("./resources/config.properties"));
>>>>> } catch (Exception e) {
>>>>> e.printStackTrace();
>>>>> }
>>>>> String inputFile = pro.getProperty("input");
>>>>> // map csv file
>>>>> return env.readCsvFile(inputFile)
>>>>> .ignoreInvalidLines()
>>>>> .fieldDelimiter('\u0009')
>>>>> //.fieldDelimiter("\t")
>>>>> //.lineDelimiter("\n")
>>>>> .includeFields(true, true, false, false, false, false,
>>>>> false, false, false, false, false
>>>>> , false, false, false, false, false, false, false,
>>>>> false, false, false
>>>>> , false, false, false, false, false, false, false,
>>>>> false, false, false
>>>>> , false, false, false, false, false, false, false,
>>>>> false, true, true
>>>>> , false, false, false, false, false, false, false,
>>>>> false, false, false
>>>>> , false, false, false, false, false, false, false,
>>>>> false)
>>>>> //.includeFields(true,true,true,true)
>>>>> .types(String.class, Long.class, Double.class,
>>>>> Double.class)
>>>>> .map(new TuplePointConverter());
>>>>> }
>>>>>
>>>>> private static final class TuplePointConverter implements
>>>>> MapFunction<Tuple4<String, Long, Double, Double>, GeoTimeDataTupel>{
>>>>>
>>>>> private static final long serialVersionUID =
>>>>> 3485560278562719538L;
>>>>>
>>>>> public GeoTimeDataTupel map(Tuple4<String, Long, Double,
>>>>> Double> t) throws Exception {
>>>>> return new GeoTimeDataTupel(new LatLongSeriable(t.f2,
>>>>> t.f3), t.f1, t.f0);
>>>>> }
>>>>> }
>>>>>
>>>>> private static DataSet<GeoTimeDataCenter>
>>>>> getCentroidDataSet(ExecutionEnvironment env) {
>>>>> // load properties
>>>>> Properties pro = new Properties();
>>>>> try {
>>>>> pro.load(new
>>>>> FileInputStream("./resources/config.properties"));
>>>>> } catch (Exception e) {
>>>>> e.printStackTrace();
>>>>> }
>>>>> String seedFile = pro.getProperty("seed.file");
>>>>> boolean seedFlag =
>>>>> Boolean.parseBoolean(pro.getProperty("seed.flag"));
>>>>> // get points from file or random
>>>>> if(seedFlag || !(new File(seedFile+"-1").exists())) {
>>>>> Seeding.randomSeeding();
>>>>> }
>>>>> // map csv file
>>>>> return env.readCsvFile(seedFile+"-1")
>>>>> .lineDelimiter("\n")
>>>>> .fieldDelimiter('\u0009')
>>>>> //.fieldDelimiter("\t")
>>>>> .includeFields(true, true, true, true)
>>>>> .types(Integer.class, Double.class, Double.class,
>>>>> Long.class)
>>>>> .map(new TupleCentroidConverter());
>>>>> }
>>>>>
>>>>> private static final class TupleCentroidConverter implements
>>>>> MapFunction<Tuple4<Integer, Double, Double, Long>, GeoTimeDataCenter>{
>>>>>
>>>>> private static final long serialVersionUID =
>>>>> -1046538744363026794L;
>>>>>
>>>>> public GeoTimeDataCenter map(Tuple4<Integer, Double, Double,
>>>>> Long> t) throws Exception {
>>>>> return new GeoTimeDataCenter(t.f0,new
>>>>> LatLongSeriable(t.f1, t.f2), t.f3);
>>>>> }
>>>>> }
>>>>> }
>>>>>
>>>>> 2015-05-21 14:22 GMT+02:00 Till Rohrmann <[email protected]>:
>>>>>
>>>>>> Concerning your first problem that you only see one resulting
>>>>>> centroid, your code looks good modulo the parts you haven't posted.
>>>>>>
>>>>>> However, your problem could simply be caused by a bad selection of
>>>>>> initial centroids. If, for example, all centroids except for one don't
>>>>>> get
>>>>>> any points assigned, then only one centroid will survive the iteration
>>>>>> step. How do you do it?
>>>>>>
>>>>>> To check that all centroids are read you can print the contents of
>>>>>> the centroids DataSet. Furthermore, you can simply println the new
>>>>>> centroids after each iteration step. In local mode you can then observe
>>>>>> the
>>>>>> computation.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi!
>>>>>>>
>>>>>>> This problem should not depend on any user code. There are no
>>>>>>> user-code dependent actors in Flink.
>>>>>>>
>>>>>>> Is there more stack trace that you can send us? It looks like it
>>>>>>> misses the core exception that is causing the issue is not part of the
>>>>>>> stack trace.
>>>>>>>
>>>>>>> Greetings,
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, May 21, 2015 at 11:11 AM, Pa Rö <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> hi flink community,
>>>>>>>>
>>>>>>>> i have implement k-means for clustering temporal geo data. i use
>>>>>>>> the following github project and my own data structure:
>>>>>>>>
>>>>>>>> https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
>>>>>>>>
>>>>>>>> not i have the problem, that flink read the centroids from file
>>>>>>>> and work parallel futher. if i look at the results, i have the feeling,
>>>>>>>> that the prgramm load only one centroid point.
>>>>>>>>
>>>>>>>> i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the
>>>>>>>> following exception:
>>>>>>>> ERROR actor.OneForOneStrategy: exception during creation
>>>>>>>> akka.actor.ActorInitializationException: exception during creation
>>>>>>>> at
>>>>>>>> akka.actor.ActorInitializationException$.apply(Actor.scala:218)
>>>>>>>> at akka.actor.ActorCell.create(ActorCell.scala:578)
>>>>>>>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>>>>>>>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>>>>>>>> at
>>>>>>>> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>>>>>>>> at
>>>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>>>>>> at
>>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>>> at
>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>>> at
>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>>> at
>>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>>> Caused by: java.lang.reflect.InvocationTargetException
>>>>>>>> at
>>>>>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>>>>>>> at
>>>>>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>>>>>>>> at
>>>>>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>>>> at
>>>>>>>> java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>>>>>>>> at akka.util.Reflect$.instantiate(Reflect.scala:65)
>>>>>>>> at akka.actor.Props.newActor(Props.scala:337)
>>>>>>>> at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>>>>>>>> at akka.actor.ActorCell.create(ActorCell.scala:560)
>>>>>>>> ... 9 more
>>>>>>>>
>>>>>>>> how can i say flink, that it should be wait for loading dataset,
>>>>>>>> and what say this exception?
>>>>>>>>
>>>>>>>> best regards,
>>>>>>>> paul
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
>