What do you mean by "does not enter ... class(es)"?

Does the log show that the scheduler ever accepts the job (You may have to
turn logging up)? Are "other" jobs that are submitted to the same class
under your user scheduled & executed? Wonder about which scheduler? What is
the definition for the scheduler class? Is it getting to a container? Let's
get a complete history of the steps you are getting please?


*.......*



*Daemeon C.M. ReiydelleUSA (+1) 415.501.0198London (+44) (0) 20 8144 9872*

On Sat, Sep 17, 2016 at 3:56 AM, Denis Mone <monede...@gmail.com> wrote:

> Hello hadoop users.
>
>     I am trying to implement a mapreduce KMeans algorithm using hadoop.
> The problem i have is that the code does not enter the map and reduce
> class. I'm running the application from Intellij Idea not using hadoop
> binary.
>
> The rest of the email is a sample of my code. If someone can see something
> that could help that would be greatly appreciated.
>
> Thanks in advance.
>
> Here is my driver code:
>
> Job job = 
> Job.getInstance(conf);job.setJobName("kmeans");job.setJarByClass(KMeans.class);FileInputFormat.addInputPath(job,
>  input);FileOutputFormat.setOutputPath(job, 
> output);job.setMapperClass(KMeansMapper.class);job.setReducerClass(KMeansReducer.class);job.setMapOutputKeyClass(PointVector.class);job.setMapOutputValueClass(PointVector.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);job.waitForCompletion(true);
>
> And below are my map and reduce classes:
>
> public class KMeansMapper extends Mapper<LongWritable, Text, PointVector, 
> PointVector> {
>
>     private int clusters;    private List<ImmutableTriple<Integer, String, 
> PointVector>> centers;    @Override    protected void setup(Context context) 
> throws IOException, InterruptedException {
>         System.out.println("Inside setup");        this.clusters = 
> Integer.valueOf(context.getConfiguration().get("clusters"));        
> this.centers = new ArrayList<>();        BufferedReader br = new 
> BufferedReader(new FileReader("/home/denis/centers"));        for(int i = 0; 
> i < clusters; i++) {
>             centers.add(DocumentRecordParser.parse(br.readLine()));        }
>         br.close();    }
>
>     @Override    public void map(LongWritable key, Text value, Context 
> context) throws IOException, InterruptedException {
>         PointVector line = 
> DocumentRecordParser.returnPointVector(value.toString());        
> System.out.println("Inside map!");        double minDist = Double.MAX_VALUE;  
>       double dist;        PointVector index = null;        EuclideanDistance 
> ed = new EuclideanDistance();        for (ImmutableTriple<Integer, String, 
> PointVector> c : centers) {
>             dist = ed.compute(line.points(), c.right.points());            if 
> (dist < minDist) {
>                 minDist = dist;                index = c.right;            }
>         }
>
>         context.write(index, line);    }
> }
>
> public class KMeansReducer extends Reducer<PointVector, PointVector, Text, 
> Text> {
>     private double min_dist = Double.MAX_VALUE;    @Override    public void 
> reduce(PointVector center, Iterable<PointVector> points, Context context) 
> throws IOException, InterruptedException {
>         EuclideanDistance measure = new EuclideanDistance();        double 
> distance = 0.0;        int numOfPoints = 0;        double diff = 0.0;        
> PointVector newCenter = null;        double [] sums = new 
> double[center.size()];        for (PointVector p : points) {
>             distance += measure.compute(center.points(), p.points());         
>    if (distance < min_dist) {
>                 min_dist = distance;                newCenter = p;            
> }
>             numOfPoints++;            sums = MathArrays.ebeAdd(p.points(), 
> sums);        }
>         for (int i = 0; i < sums.length; i++) {
>             sums[i] = sums[i] / numOfPoints;        }        
> System.out.println("Old center " + center + " new center: " + newCenter);     
>    context.write(new Text(newCenter.toString()) , new Text(new 
> PointVector(sums).toString()));    }
> }
>
> Last but not least my custom data structure class PointVector
>
> public class PointVector implements WritableComparable<PointVector> {
>     /**     * Keep the tfIdf values of the terms of a document     */    
> private Vector<Double> data = new Vector<>();    public PointVector(double [] 
> values) {
>         this.data = new Vector<>(values.length);        
> this.data.addAll(Doubles.asList(values));    }
>
>     public PointVector(List<Double> values) {
>         this.data = new Vector<>(values.size());        
> this.data.addAll(values);    }
>
>     public PointVector(String [] values) {
>         this.data = new Vector<>(values.length);        for (String s: 
> values) {
>             this.data.add(Double.valueOf(s));        }
>     }
>
>     public PointVector() {
>         this.data = new Vector<>();    }
>
>     public double[] points() {
>         return Doubles.toArray(data);    }
>
>     /**     * Subtract the values of this vector from the PointVector passed 
> as argument     * @param subtracted     * @return     */    public 
> PointVector sub(PointVector subtracted) {
>         int N = this.data.size();        double [] vals = new double[N];      
>   for (int i = 0; i < N; i++) {
>             vals[i] = this.data.get(i) - subtracted.get(i);        }
>         return new PointVector(vals);    }
>
>     public PointVector add(PointVector vec) {
>         int N = this.data.size();        double [] vals = new double[N];      
>   for (int i = 0; i < N; i++) {
>             vals[i] = this.data.get(i) + vec.get(i);        }
>         return new PointVector(vals);    }
>
>     /**     * Compute the dot product of this vector with the one passed as 
> argument     * @param vector     * @return     */    public double 
> dotProduct(PointVector vector) {
>         int N = this.data.size();        double sum = 0.0;        for (int i 
> = 0; i < N; i++) {
>             sum += this.data.get(i) * vector.get(i);        }
>         return sum;    }
>
>     @Override    public int compareTo(PointVector pointVector) {
>         return 0;    }
>
>     @Override    public void write(DataOutput dataOutput) throws IOException {
>         dataOutput.writeInt(data.size());        for (double d : data) {
>             dataOutput.writeDouble(d);        }
>     }
>
>     @Override    public void readFields(DataInput dataInput) throws 
> IOException {
>         int s = dataInput.readInt(); // read the size of the vector    }
>
>     public Double get(int i) {
>         return this.data.get(i);    }
>
>     public int size() {
>         return this.data.size();    }
>
>     @Override    public String toString() {
>         if (data.isEmpty()) {
>             return "[]";        }
>         StringBuilder sb = new StringBuilder();        sb.append("[");        
> for (double d : data) {
>             sb.append(d);            sb.append(", ");        }
>         final int pos = sb.lastIndexOf(",");        sb.delete(pos, pos + 1);  
>       sb.append("]");        return sb.toString();    }
> }
>
>

Reply via email to