package bulk_test;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.phoenix.mapreduce.PhoenixInputFormat;
import org.apache.phoenix.mapreduce.PhoenixOutputFormat;

//import bulk_test.StockBean;

import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;

import bulk_test.StockWritable;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;


public class PhoenixSparkJob {

    private static final String SPARK_MASTER_URL     = "local[*]";

    private static final String ZOOKEEPER_QUORUM_URL = "localhost:2181";

    public static void main (
                             String[] args)
                                           throws IOException
    {
        final SparkConf sparkConf = new SparkConf()
                                            .setAppName("phoenix-spark")
                                            .set("spark.executor.memory", "2g")
                                            .setMaster(SPARK_MASTER_URL);


         JavaSparkContext jsc = new JavaSparkContext(sparkConf);

        final Configuration configuration = HBaseConfiguration.create();
        configuration.set(HConstants.ZOOKEEPER_QUORUM, ZOOKEEPER_QUORUM_URL);
        PhoenixConfigurationUtil.setInputTableName(configuration , "STOCKS");
        PhoenixConfigurationUtil.setOutputTableName(configuration , "STOCKS");
        PhoenixConfigurationUtil.setInputQuery(configuration, "SELECT STOCK_NAME,RECORDING_YEAR,RECORDINGS_QUARTER FROM STOCKS");
        System.out.println("Query::"+"SELECT STOCK_NAME,RECORDING_YEAR,RECORDINGS_QUARTER FROM STOCKS");
        PhoenixConfigurationUtil.setInputClass(configuration, StockWritable.class);
        String[] upsertColumns = {"STOCK_NAME","RECORDING_YEAR","RECORDINGS_AVG"};
        PhoenixConfigurationUtil.setUpsertColumnNames(configuration,upsertColumns);
        configuration.setClass("mapreduce.job.outputformat.class",PhoenixOutputFormat.class, OutputFormat.class);
        configuration.setBoolean("spark.hadoop.validateOutputSpecs", false);
         
         @SuppressWarnings("unchecked")
        JavaPairRDD<NullWritable, StockWritable> stocksRDD = jsc.newAPIHadoopRDD(
                        configuration,
                        PhoenixInputFormat.class,
                        NullWritable.class,
                        StockWritable.class);
         
       //  System.out.println(String.format(" the number of records are [%s] ", stocksRDD.count()));
         
         stocksRDD.mapToPair(new PairFunction<Tuple2<NullWritable,StockWritable>,NullWritable,StockWritable> () {

            @Override
            public Tuple2<NullWritable, StockWritable> call(Tuple2 tuple) throws Exception {
                final StockWritable stockWritable = (StockWritable)tuple._2;
                /*final StockBean bean = stockWritable.getStockBean();
                double[] recordings = bean.getRecordings();
                double sum = 0.0;
                for(double recording: recordings) {
                    sum += recording;
                }l
                double avg = sum / recordings.length;
                bean.setAverage(avg);*/
                stockWritable.setRECORDINGS_AVG(new Text("123"));
                return new Tuple2<NullWritable, StockWritable>(NullWritable.get(), stockWritable);
            }
         }).saveAsNewAPIHadoopDataset(configuration);
        
         jsc.stop();
    }
}