i have fix a bug at the input reading, but the results are still different.
i think i have local the problem, in the other implementation i sum all geo
points/time points and share thougt the counter.
but in flink i sum two points and share thougt two, and sum the next...
the method is the following:
// sums and counts point coordinates
private static final class CentroidAccumulator implements
ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {
private static final long serialVersionUID = -4868797820391121771L;
public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer,
GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0,
addAndDiv(val1.f0,val1.f1,val2.f1));
}
}
private static GeoTimeDataTupel addAndDiv(int
clusterid,GeoTimeDataTupel input1, GeoTimeDataTupel input2){
long time = (input1.getTime()+input2.getTime())/2;
List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
list.add(input1.getGeo());
list.add(input2.getGeo());
LatLongSeriable geo = Geometry.getGeoCenterOf(list);
return new GeoTimeDataTupel(geo,time,"POINT");
}
how i can sum all points and share thoug the counter?
2015-05-22 9:53 GMT+02:00 Pa Rö <[email protected]>:
> hi,
> if i print the centroids all are show in the output. i have implement k
> means with map reduce und spark. by same input, i get the same output. but
> in flink i get a one cluster output with this input set. (i use csv files
> from the GDELT projekt)
>
> here my class:
>
> public class FlinkMain {
>
>
> public static void main(String[] args) {
> //load properties
> Properties pro = new Properties();
> try {
> pro.load(new FileInputStream("./resources/config.properties"));
> } catch (Exception e) {
> e.printStackTrace();
> }
> int maxIteration =
> 1;//Integer.parseInt(pro.getProperty("maxiterations"));
> String outputPath = pro.getProperty("flink.output");
> // set up execution environment
> ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> // get input points
> DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
> DataSet<GeoTimeDataCenter> centroids = getCentroidDataSet(env);
> // set number of bulk iterations for KMeans algorithm
> IterativeDataSet<GeoTimeDataCenter> loop =
> centroids.iterate(maxIteration);
> DataSet<GeoTimeDataCenter> newCentroids = points
> // compute closest centroid for each point
> .map(new SelectNearestCenter()).withBroadcastSet(loop,
> "centroids")
> // count and sum point coordinates for each centroid
> .groupBy(0).reduce(new CentroidAccumulator())
> // compute new centroids from point counts and coordinate sums
> .map(new CentroidAverager());
> // feed new centroids back into next iteration
> DataSet<GeoTimeDataCenter> finalCentroids =
> loop.closeWith(newCentroids);
> DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
> // assign points to final clusters
> .map(new
> SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
> // emit result
> clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
> finalCentroids.writeAsText(outputPath+"/centers");//print();
> // execute program
> try {
> env.execute("KMeans Flink");
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
>
> private static final class SelectNearestCenter extends
> RichMapFunction<GeoTimeDataTupel,Tuple2<Integer,GeoTimeDataTupel>> {
>
> private static final long serialVersionUID = -2729445046389350264L;
> private Collection<GeoTimeDataCenter> centroids;
>
> @Override
> public void open(Configuration parameters) throws Exception {
> this.centroids =
> getRuntimeContext().getBroadcastVariable("centroids");
> }
>
> @Override
> public Tuple2<Integer, GeoTimeDataTupel> map(GeoTimeDataTupel
> point) throws Exception {
> double minDistance = Double.MAX_VALUE;
> int closestCentroidId= -1;
>
> // check all cluster centers
> for(GeoTimeDataCenter centroid : centroids) {
> // compute distance
> double distance = Distance.ComputeDist(point, centroid);
> // update nearest cluster if necessary
> if(distance < minDistance) {
> minDistance = distance;
> closestCentroidId = centroid.getId();
> }
> }
> // emit a new record with the center id and the data point
> return new Tuple2<Integer,
> GeoTimeDataTupel>(closestCentroidId, point);
> }
> }
>
> // sums and counts point coordinates
> private static final class CentroidAccumulator implements
> ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {
>
> private static final long serialVersionUID = -4868797820391121771L;
>
> public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer,
> GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
> return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0,
> addAndDiv(val1.f1,val2.f1));
> }
> }
>
> private static GeoTimeDataTupel addAndDiv(GeoTimeDataTupel input1,
> GeoTimeDataTupel input2){
> long time = (input1.getTime()+input2.getTime())/2;
> List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
> list.add(input1.getGeo());
> list.add(input2.getGeo());
> LatLongSeriable geo = Geometry.getGeoCenterOf(list);
>
> return new GeoTimeDataTupel(geo,time,"POINT");
> }
>
> // computes new centroid from coordinate sum and count of points
> private static final class CentroidAverager implements
> MapFunction<Tuple2<Integer, GeoTimeDataTupel>, GeoTimeDataCenter> {
>
> private static final long serialVersionUID = -2687234478847261803L;
>
> public GeoTimeDataCenter map(Tuple2<Integer, GeoTimeDataTupel>
> value) {
> return new GeoTimeDataCenter(value.f0,
> value.f1.getGeo(),value.f1.getTime());
> }
> }
>
> private static DataSet<GeoTimeDataTupel>
> getPointDataSet(ExecutionEnvironment env) {
> // load properties
> Properties pro = new Properties();
> try {
> pro.load(new FileInputStream("./resources/config.properties"));
> } catch (Exception e) {
> e.printStackTrace();
> }
> String inputFile = pro.getProperty("input");
> // map csv file
> return env.readCsvFile(inputFile)
> .ignoreInvalidLines()
> .fieldDelimiter('\u0009')
> //.fieldDelimiter("\t")
> //.lineDelimiter("\n")
> .includeFields(true, true, false, false, false, false, false,
> false, false, false, false
> , false, false, false, false, false, false, false,
> false, false, false
> , false, false, false, false, false, false, false,
> false, false, false
> , false, false, false, false, false, false, false,
> false, true, true
> , false, false, false, false, false, false, false,
> false, false, false
> , false, false, false, false, false, false, false,
> false)
> //.includeFields(true,true,true,true)
> .types(String.class, Long.class, Double.class, Double.class)
> .map(new TuplePointConverter());
> }
>
> private static final class TuplePointConverter implements
> MapFunction<Tuple4<String, Long, Double, Double>, GeoTimeDataTupel>{
>
> private static final long serialVersionUID = 3485560278562719538L;
>
> public GeoTimeDataTupel map(Tuple4<String, Long, Double, Double>
> t) throws Exception {
> return new GeoTimeDataTupel(new LatLongSeriable(t.f2, t.f3),
> t.f1, t.f0);
> }
> }
>
> private static DataSet<GeoTimeDataCenter>
> getCentroidDataSet(ExecutionEnvironment env) {
> // load properties
> Properties pro = new Properties();
> try {
> pro.load(new FileInputStream("./resources/config.properties"));
> } catch (Exception e) {
> e.printStackTrace();
> }
> String seedFile = pro.getProperty("seed.file");
> boolean seedFlag =
> Boolean.parseBoolean(pro.getProperty("seed.flag"));
> // get points from file or random
> if(seedFlag || !(new File(seedFile+"-1").exists())) {
> Seeding.randomSeeding();
> }
> // map csv file
> return env.readCsvFile(seedFile+"-1")
> .lineDelimiter("\n")
> .fieldDelimiter('\u0009')
> //.fieldDelimiter("\t")
> .includeFields(true, true, true, true)
> .types(Integer.class, Double.class, Double.class, Long.class)
> .map(new TupleCentroidConverter());
> }
>
> private static final class TupleCentroidConverter implements
> MapFunction<Tuple4<Integer, Double, Double, Long>, GeoTimeDataCenter>{
>
> private static final long serialVersionUID = -1046538744363026794L;
>
> public GeoTimeDataCenter map(Tuple4<Integer, Double, Double, Long>
> t) throws Exception {
> return new GeoTimeDataCenter(t.f0,new LatLongSeriable(t.f1,
> t.f2), t.f3);
> }
> }
> }
>
> 2015-05-21 14:22 GMT+02:00 Till Rohrmann <[email protected]>:
>
>> Concerning your first problem that you only see one resulting centroid,
>> your code looks good modulo the parts you haven't posted.
>>
>> However, your problem could simply be caused by a bad selection of
>> initial centroids. If, for example, all centroids except for one don't get
>> any points assigned, then only one centroid will survive the iteration
>> step. How do you do it?
>>
>> To check that all centroids are read you can print the contents of the
>> centroids DataSet. Furthermore, you can simply println the new centroids
>> after each iteration step. In local mode you can then observe the
>> computation.
>>
>> Cheers,
>> Till
>>
>> On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen <[email protected]> wrote:
>>
>>> Hi!
>>>
>>> This problem should not depend on any user code. There are no user-code
>>> dependent actors in Flink.
>>>
>>> Is there more stack trace that you can send us? It looks like it misses
>>> the core exception that is causing the issue is not part of the stack trace.
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>>
>>> On Thu, May 21, 2015 at 11:11 AM, Pa Rö <[email protected]>
>>> wrote:
>>>
>>>> hi flink community,
>>>>
>>>> i have implement k-means for clustering temporal geo data. i use the
>>>> following github project and my own data structure:
>>>>
>>>> https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
>>>>
>>>> not i have the problem, that flink read the centroids from file and
>>>> work parallel futher. if i look at the results, i have the feeling, that
>>>> the prgramm load only one centroid point.
>>>>
>>>> i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the
>>>> following exception:
>>>> ERROR actor.OneForOneStrategy: exception during creation
>>>> akka.actor.ActorInitializationException: exception during creation
>>>> at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
>>>> at akka.actor.ActorCell.create(ActorCell.scala:578)
>>>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>>>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>>>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>>>> at
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> Caused by: java.lang.reflect.InvocationTargetException
>>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>>> Method)
>>>> at
>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>>>> at
>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>>>> at akka.util.Reflect$.instantiate(Reflect.scala:65)
>>>> at akka.actor.Props.newActor(Props.scala:337)
>>>> at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>>>> at akka.actor.ActorCell.create(ActorCell.scala:560)
>>>> ... 9 more
>>>>
>>>> how can i say flink, that it should be wait for loading dataset, and
>>>> what say this exception?
>>>>
>>>> best regards,
>>>> paul
>>>>
>>>
>>>
>>
>