public class Application {
    static private Logger log  = Logger.getLogger(Application.class);

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("NMS Tuning Engine");
        JavaSparkContext sc = new JavaSparkContext(conf);

        try {
            testJdbc(sc);
            testSpark(sc);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    static public void testSpark(JavaSparkContext sc) throws Exception {
        //SparkContextBuilder.buildSparkContext("Simple Application", "local");

        // One JVM can only have one Spark Context now
        Map<String, String> options = new HashMap<String, String>();
        SQLContext sqlContext = new SQLContext(sc);

        String tableStr = "\"ACME:ENDPOINT_STATUS\"";
        String dataSrcUrl="jdbc:phoenix:luna-sdp-nms-01.davis.sensus.lab:2181:/hbase-unsecure";
        options.put("zkUrl", dataSrcUrl);
        options.put("table", tableStr);
        log.info("Phoenix DB URL: " + dataSrcUrl + " tableStr: " + tableStr);

        DataFrame df = null;
        try {
            df = sqlContext.read().format("org.apache.phoenix.spark").options(options).load();
            df.explain(true);
        } catch (Exception ex) {
            log.error("sql error: ", ex);
        }

        try {
            log.info ("Count By phoenix spark plugin: "+ df.count());
        } catch (Exception ex) {
            log.error("dataframe error: ", ex);
        }

    }

    static public void testJdbc(JavaSparkContext sc) throws Exception {
        Map<String, String> options = new HashMap<String, String>();
        SQLContext sqlContext = new SQLContext(sc);

        if (sc == null || sqlContext == null || options == null) {
            log.info("NULL sc, sqlContext, or options");
        }

        String qry2 = "(Select ENDPOINT_ID, CITY from \"ACME:ENDPOINT_STATUS\" Where city = 'ACME City')";
        String dataSrcUrl="jdbc:phoenix:luna-sdp-nms-01.davis.sensus.lab:2181:/hbase-unsecure";
        options.put("url", dataSrcUrl);
        options.put("dbtable", qry2);
        log.info("Phoenix DB URL: " + dataSrcUrl + "\nquery: " + qry2);

        DataFrame df = null;
        try {
            DataFrameReader dfRd = sqlContext.read().format("jdbc").options(options);

            if (dfRd == null) {
                log.error("NULL DataFrameReader Object dfRd in getEndPointDataByJdbc");
            }
            df = dfRd.load();
            df.explain(true);

        } catch (Exception ex) {
            log.error("sql error: ", ex);
        }

        try {
            log.info ("Count By Jdbc: "+ df.count());
        } catch (Exception ex) {
            log.error("dataframe error: ", ex);
        }
    }


}
