package hadoop;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleOutputs;

@SuppressWarnings("deprecation")
public class TestMultipleOutputs
{
  public final static String OUTPUT_NAMES = "OUTPUT_NAMES";

  static class TestMapper extends MapReduceBase implements Mapper<LongWritable, Text, NullWritable, Text>
  {
    String[] opNames;
    OutputCollector<NullWritable, Text>[] collectors = null;
    MultipleOutputs mop;

    public TestMapper()
    {

    }

    @SuppressWarnings("unchecked")
    @Override
    public void configure(JobConf job)
    {
      opNames = job.get(OUTPUT_NAMES).split(",");
      mop = new MultipleOutputs(job);
      for (String opName : opNames)
      {
        System.out.println(opName);
      }

    }

    @SuppressWarnings("unchecked")
    @Override
    public void map(LongWritable arg0, Text arg1, OutputCollector<NullWritable, Text> arg2, Reporter arg3) throws IOException
    {
      for (String opName : opNames)
      {
        mop.getCollector(opName, arg3).collect(NullWritable.get(), arg1);
      }
    }
  }

  public static void main(String[] args) throws Exception
  {
    Configuration conf = new Configuration();
    JobConf job = new JobConf();
    job.set("mapred.job.tracker", "<<jobtracker>>");
    job.set("fs.default.name", "<<hdfs>>");
    job.setJarByClass(TestMultipleOutputs.class);
    job.setMapperClass(TestMultipleOutputs.TestMapper.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);
    job.setInputFormat(TextInputFormat.class);
    job.setNumReduceTasks(0);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    StringBuilder bdr = new StringBuilder();
    for (int i = 0; i < 12; i++)
    {
      MultipleOutputs.addNamedOutput(job, "Output" + i, TextOutputFormat.class, NullWritable.class, Text.class);
      bdr.append("Output" + i + ",");
    }
    bdr.deleteCharAt(bdr.length() - 1);
    job.set(OUTPUT_NAMES, bdr.toString());
    JobClient jc = new JobClient(job);
    RunningJob rJob = jc.submitJob(job);
    jc.monitorAndPrintJob(job, rJob);
  }
}
