Hi
I'm using this environment:

   - Apache Hadoop 1.2.1
   - Apache HBase  0.96 hadoop1
   - Apache Mahout 0.8
   - Spring 3.2

I have to do some clustering on records stored in my HBase table. Records
have these properties:

   - idArco: the arc ID of the route I'm considering
   - velocitaMedia: medium velocity on that route
   - matchingQuality: quality of the misuration
   - startDate: misuration start date
   - endDate: misuration start date
   - vehiclesNumber: number of vehicles involved in the misuration
   - meseAnno: misuration month of the year
   - giornoSettimana: week day of the misuration
   - oraGiorno: day hour of the misuration
   - calendarioFestivo: numeric representing if there has been some kind of
   festivity during the misuration
   - calendarioEventi: numeric representing if there has been some kind of
   event during the misuration
   - eventoMeteo: numeric representing the weather situation during the
   misuration
   - manifestazione: numeric representing if there has been some kind of
   manifestation during the misuration
   - annoMisurazione: year of the misuration
   - tipoStrada: route tipology (if it's highway, normal way and so on...)
   - minutoOra: minutes of the misuration hour
   - startDateLong: misuration start date in millis
   - endDateLong: misuration final date in millis
   - idCluster: record membership cluster id

I need to realize the cluster according to the following properties of my
records: calendarioFestivo, calendarioEventi, eventoMeteo, manifestazione
(and maybe some othe properties like oraGiorno, minutoOra and so on...but
for now it's good to stop here; if I did my work correctly I guess it will
be simple to add some other parameters); the final result should be
something like this:

   - cluster 0 is the cluster for a well known weather condition, with a
   well known kind of festivity and event and so on...
   - cluster 1 is the cluster for another well known weather condition,
   with a well known kind of festivity and event and so on...

In order to realize the clustering I'm using Mahout KMeans Implementation
and in order to find the right K value I used Canopy.
The first thing I did is to take all data from my HBase by using the HBase
MapReduce funcionality; by all these data I wrote the SequenceFile
representing the input data for the clustering algorithm, so I wrote this
code:
//*Map side*:
protected void map(ImmutableBytesWritable key, Result result,
org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException,
InterruptedException {
try{
Double calFest = //Taken from DB
Double calEven = //Taken from DB
Double meteo = //Taken from DB
Double manifestazione = //Taken from DB
Double tipologiaStrada = //Taken from DB
String chiave = Bytes.toString(result.getRow());
Text text = new Text();
text.set(chiave);
DenseVector dv = new DenseVector(new double[]{calFest, calEven, meteo,
manifestazione, tipologiaStrada});
NamedVector nv = new NamedVector(dv, chiave);
context.write(text, new VectorWritable(nv));
}catch(Exception e){
//Handled e
}
}
//*Reduce side*
    public static class HistoricalDataReducer extends Reducer<Text,
VectorWritable, Text, VectorWritable> {
        private SequenceFile.Writer sfWriter;
        protected void cleanup(Context context) throws IOException,
InterruptedException {
            try {
                sfWriter.close();
            } catch (Exception e) {
              //Handled e
            }
        }
        protected void setup(Context context) throws IOException,
InterruptedException {
            try {
                Path inputDataSeq = new Path("somePath");
                Configuration conf = new Configuration();
                HadoopUtil.delete(conf, inputDataSeq);
                FileSystem fs = FileSystem.get(conf);
                this.sfWriter = new SequenceFile.Writer(fs, conf,
inputDataSeq, Text.class, VectorWritable.class);
            } catch (Exception e) {
                //Handled e
            }
        }
        protected void reduce(Text key, Iterable<VectorWritable> values,
Context context) throws IOException, InterruptedException {
            try{
                Iterator<VectorWritable> iterator = values.iterator();
                while (iterator.hasNext()) {
                    VectorWritable vectorWritable = iterator.next();
                    NamedVector nv = (NamedVector)vectorWritable.get();
                    this.sfWriter.append(new Text( nv.getName()),
vectorWritable);
                    context.write(key, vectorWritable);
                }
            }catch(Exception e){
                //Handled e
            }
        }
        }

Then I wrote this class for the cluster analysis:
//Import stuff
@Service
public class ClusterAnalysisSvcImpl implements IClusterAnalysisSvc {
//Some configuration properties
@Override
public void executeClusterAnalysis() throws ClusterAnalysisException {

try {
DistanceMeasure measure = new EuclideanDistanceMeasure();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
writeInputData(fs, conf);
writeClusters(fs, conf, measure);
Path inputData = new Path(inputDataLocation);
Path inputClaster = new Path(inputClusterDirectory);
Path outputResults = new Path(outputClusterDirectory);
HadoopUtil.delete(conf, outputResults);
sw.start("fase di clustering");
KMeansDriver.run(conf,
inputData,
inputClaster,
outputResults,
measure,
convergenceDelta,
iterationNumber,
true,
0,
false);
} catch (Exception e) {
//Handled e
}
}
private List<Canopy> getCanopies(FileSystem fs, Configuration conf,
DistanceMeasure measure) throws Exception{
SequenceFile.Reader reader = null;
try{
reader = new SequenceFile.Reader(fs, new Path(inputDataLocation), conf);
Text key = new Text();
VectorWritable value = new VectorWritable();
List<Vector> elementi = new ArrayList<Vector>();
while(reader.next(key, value)){
NamedVector nv = (NamedVector)value.get();
elementi.add(nv);
}
if( t1<=t2 ){
throw new IllegalArgumentException("Impossibile proseguire; le soglie di
distanza Canopy non sono corrette; t1 รจ minore o uguale a t2: t1="+t1+"
t2="+t2);
}
//Individuo i canopy
List<Canopy> canopies = CanopyClusterer.createCanopies(elementi, measure,
t1, t2);
return canopies;
}finally{
if( reader != null ){
reader.close();
}
}
}
private void writeClusters(FileSystem fs, Configuration conf,
DistanceMeasure measure) throws Exception{
SequenceFile.Writer writer = null;
try {
Path path = new Path(clusterPathFile);
HadoopUtil.delete(conf, path);
List<Canopy> canopies = getCanopies(fs, conf, measure);
writer = new SequenceFile.Writer(fs, conf, path, Text.class, Kluster.class);
for (Canopy canopy : canopies) {
Kluster clust = new Kluster(canopy.getCenter(), canopy.getId(), measure);
writer.append(new Text( (clust.getIdentifier()) ), clust);
}
}finally {
if( writer != null ){
writer.close();
}
}
}
private void writeInputData(FileSystem fs, Configuration conf) throws
Exception{
try {
//Calling the HBase MapReduce utility
}
}

In this way, by using a dataset of 28800 records, I found 360 clusters.
Since I'm newbie I'ld like to know what you, experts, think about my
approach; is this a good way to realize the cluster analysis? Or should I
do it in other way?

Thank you

Reply via email to