hi,
if i print the centroids all are show in the output. i have implement k
means with map reduce und spark. by same input, i get the same output. but
in flink i get a one cluster output with this input set. (i use csv files
from the GDELT projekt)
here my class:
public class FlinkMain {
public static void main(String[] args) {
//load properties
Properties pro = new Properties();
try {
pro.load(new FileInputStream("./resources/config.properties"));
} catch (Exception e) {
e.printStackTrace();
}
int maxIteration =
1;//Integer.parseInt(pro.getProperty("maxiterations"));
String outputPath = pro.getProperty("flink.output");
// set up execution environment
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
// get input points
DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
DataSet<GeoTimeDataCenter> centroids = getCentroidDataSet(env);
// set number of bulk iterations for KMeans algorithm
IterativeDataSet<GeoTimeDataCenter> loop =
centroids.iterate(maxIteration);
DataSet<GeoTimeDataCenter> newCentroids = points
// compute closest centroid for each point
.map(new SelectNearestCenter()).withBroadcastSet(loop,
"centroids")
// count and sum point coordinates for each centroid
.groupBy(0).reduce(new CentroidAccumulator())
// compute new centroids from point counts and coordinate sums
.map(new CentroidAverager());
// feed new centroids back into next iteration
DataSet<GeoTimeDataCenter> finalCentroids =
loop.closeWith(newCentroids);
DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
// assign points to final clusters
.map(new
SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
// emit result
clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
finalCentroids.writeAsText(outputPath+"/centers");//print();
// execute program
try {
env.execute("KMeans Flink");
} catch (Exception e) {
e.printStackTrace();
}
}
private static final class SelectNearestCenter extends
RichMapFunction<GeoTimeDataTupel,Tuple2<Integer,GeoTimeDataTupel>> {
private static final long serialVersionUID = -2729445046389350264L;
private Collection<GeoTimeDataCenter> centroids;
@Override
public void open(Configuration parameters) throws Exception {
this.centroids =
getRuntimeContext().getBroadcastVariable("centroids");
}
@Override
public Tuple2<Integer, GeoTimeDataTupel> map(GeoTimeDataTupel
point) throws Exception {
double minDistance = Double.MAX_VALUE;
int closestCentroidId= -1;
// check all cluster centers
for(GeoTimeDataCenter centroid : centroids) {
// compute distance
double distance = Distance.ComputeDist(point, centroid);
// update nearest cluster if necessary
if(distance < minDistance) {
minDistance = distance;
closestCentroidId = centroid.getId();
}
}
// emit a new record with the center id and the data point
return new Tuple2<Integer, GeoTimeDataTupel>(closestCentroidId,
point);
}
}
// sums and counts point coordinates
private static final class CentroidAccumulator implements
ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {
private static final long serialVersionUID = -4868797820391121771L;
public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer,
GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0,
addAndDiv(val1.f1,val2.f1));
}
}
private static GeoTimeDataTupel addAndDiv(GeoTimeDataTupel input1,
GeoTimeDataTupel input2){
long time = (input1.getTime()+input2.getTime())/2;
List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
list.add(input1.getGeo());
list.add(input2.getGeo());
LatLongSeriable geo = Geometry.getGeoCenterOf(list);
return new GeoTimeDataTupel(geo,time,"POINT");
}
// computes new centroid from coordinate sum and count of points
private static final class CentroidAverager implements
MapFunction<Tuple2<Integer, GeoTimeDataTupel>, GeoTimeDataCenter> {
private static final long serialVersionUID = -2687234478847261803L;
public GeoTimeDataCenter map(Tuple2<Integer, GeoTimeDataTupel>
value) {
return new GeoTimeDataCenter(value.f0,
value.f1.getGeo(),value.f1.getTime());
}
}
private static DataSet<GeoTimeDataTupel>
getPointDataSet(ExecutionEnvironment env) {
// load properties
Properties pro = new Properties();
try {
pro.load(new FileInputStream("./resources/config.properties"));
} catch (Exception e) {
e.printStackTrace();
}
String inputFile = pro.getProperty("input");
// map csv file
return env.readCsvFile(inputFile)
.ignoreInvalidLines()
.fieldDelimiter('\u0009')
//.fieldDelimiter("\t")
//.lineDelimiter("\n")
.includeFields(true, true, false, false, false, false, false,
false, false, false, false
, false, false, false, false, false, false, false,
false, false, false
, false, false, false, false, false, false, false,
false, false, false
, false, false, false, false, false, false, false,
false, true, true
, false, false, false, false, false, false, false,
false, false, false
, false, false, false, false, false, false, false,
false)
//.includeFields(true,true,true,true)
.types(String.class, Long.class, Double.class, Double.class)
.map(new TuplePointConverter());
}
private static final class TuplePointConverter implements
MapFunction<Tuple4<String, Long, Double, Double>, GeoTimeDataTupel>{
private static final long serialVersionUID = 3485560278562719538L;
public GeoTimeDataTupel map(Tuple4<String, Long, Double, Double> t)
throws Exception {
return new GeoTimeDataTupel(new LatLongSeriable(t.f2, t.f3),
t.f1, t.f0);
}
}
private static DataSet<GeoTimeDataCenter>
getCentroidDataSet(ExecutionEnvironment env) {
// load properties
Properties pro = new Properties();
try {
pro.load(new FileInputStream("./resources/config.properties"));
} catch (Exception e) {
e.printStackTrace();
}
String seedFile = pro.getProperty("seed.file");
boolean seedFlag =
Boolean.parseBoolean(pro.getProperty("seed.flag"));
// get points from file or random
if(seedFlag || !(new File(seedFile+"-1").exists())) {
Seeding.randomSeeding();
}
// map csv file
return env.readCsvFile(seedFile+"-1")
.lineDelimiter("\n")
.fieldDelimiter('\u0009')
//.fieldDelimiter("\t")
.includeFields(true, true, true, true)
.types(Integer.class, Double.class, Double.class, Long.class)
.map(new TupleCentroidConverter());
}
private static final class TupleCentroidConverter implements
MapFunction<Tuple4<Integer, Double, Double, Long>, GeoTimeDataCenter>{
private static final long serialVersionUID = -1046538744363026794L;
public GeoTimeDataCenter map(Tuple4<Integer, Double, Double, Long>
t) throws Exception {
return new GeoTimeDataCenter(t.f0,new LatLongSeriable(t.f1,
t.f2), t.f3);
}
}
}
2015-05-21 14:22 GMT+02:00 Till Rohrmann <[email protected]>:
> Concerning your first problem that you only see one resulting centroid,
> your code looks good modulo the parts you haven't posted.
>
> However, your problem could simply be caused by a bad selection of initial
> centroids. If, for example, all centroids except for one don't get any
> points assigned, then only one centroid will survive the iteration step.
> How do you do it?
>
> To check that all centroids are read you can print the contents of the
> centroids DataSet. Furthermore, you can simply println the new centroids
> after each iteration step. In local mode you can then observe the
> computation.
>
> Cheers,
> Till
>
> On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen <[email protected]> wrote:
>
>> Hi!
>>
>> This problem should not depend on any user code. There are no user-code
>> dependent actors in Flink.
>>
>> Is there more stack trace that you can send us? It looks like it misses
>> the core exception that is causing the issue is not part of the stack trace.
>>
>> Greetings,
>> Stephan
>>
>>
>>
>> On Thu, May 21, 2015 at 11:11 AM, Pa Rö <[email protected]>
>> wrote:
>>
>>> hi flink community,
>>>
>>> i have implement k-means for clustering temporal geo data. i use the
>>> following github project and my own data structure:
>>>
>>> https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
>>>
>>> not i have the problem, that flink read the centroids from file and work
>>> parallel futher. if i look at the results, i have the feeling, that the
>>> prgramm load only one centroid point.
>>>
>>> i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the
>>> following exception:
>>> ERROR actor.OneForOneStrategy: exception during creation
>>> akka.actor.ActorInitializationException: exception during creation
>>> at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
>>> at akka.actor.ActorCell.create(ActorCell.scala:578)
>>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>>> at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> Caused by: java.lang.reflect.InvocationTargetException
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>> Method)
>>> at
>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>>> at
>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>>> at akka.util.Reflect$.instantiate(Reflect.scala:65)
>>> at akka.actor.Props.newActor(Props.scala:337)
>>> at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>>> at akka.actor.ActorCell.create(ActorCell.scala:560)
>>> ... 9 more
>>>
>>> how can i say flink, that it should be wait for loading dataset, and
>>> what say this exception?
>>>
>>> best regards,
>>> paul
>>>
>>
>>
>