package hive3;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hive.hcatalog.streaming.mutate.client.*;
import org.apache.hive.hcatalog.streaming.mutate.worker.*;
import org.apache.hive.streaming.HiveStreamingConnection;
import org.apache.hive.streaming.StreamingConnection;
import org.apache.hive.streaming.StreamingException;
import org.apache.hive.streaming.StrictJsonWriter;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

public class HiveStreamingTest {
    private static final int[] BUCKET_COLUMN_INDEXES = new int[] { 0 };
    private static final int RECORD_ID_COLUMN = 2;
    private static String metaStoreUri = "thrift://10.33.105.126:9083";
    private static String table = "alerts1";
    private static String database = "testing";


    private static final List<String> EUROPE_FRANCE = Arrays.asList("Europe", "France");
    private static final List<String> EUROPE_UK = Arrays.asList("Europe", "UK");
    private static final List<String> ASIA_INDIA = Arrays.asList("Asia", "India");

    public static void main(String[] args) {
        testStreamingInsert();
    }


    private static int encodeBucket(int bucketId) {
        return BucketCodec.V1.encode(
                new AcidOutputFormat.Options(null).bucket(bucketId));
    }

    public static void testStreamingInsert(){
        // dynamic partitioning
        // create delimited record writer whose schema exactly matches table schema
        HiveEndPoint endPt = new HiveEndPoint("thrift://10.x.y.z:9083", dbName, tblName, null);
        StrictJsonWriter writer = StrictJsonWriter.newBuilder()
                .build();
        HiveConf hiveConf = new HiveConf(new Configuration(), HiveConf.class);
        hiveConf.set("hive.metastore.uris", metaStoreUri);

        updateHiveConf(hiveConf);

        String hiveSitePath = "/etc/hive/conf/hive-site.xml";
        hiveConf.addResource("hdfs-site.xml");

        StreamingConnection connection = null;
        try {
            connection = HiveStreamingConnection.newBuilder()
                    .withDatabase(database)
                    .withTable(table)
                    .withAgentInfo("example-agent-1")
                    .withRecordWriter(writer)
                    .withHiveConf(hiveConf)
                    .connect();
            // begin a transaction, write records and commit 1st transaction
            connection.beginTransaction();
            // dynamic partition mode where last 2 columns are partition values
//            connection.
            connection.write(("{\n" +
                    "                \"id\": 218,\n" +
                    "                \"msg\": \"val218\",\n" +
                    "                \"continent\": \"Africa8\",\n" +
                    "                \"country\": \"Egypt8\"\n" +
                    "                \n" +
                    "            }").getBytes());
//            connection.write(("{\n" +
//                    "                \"id\": 212,\n" +
//                    "                \"msg\": \"val212\",\n" +
//                    "                \"continent\": \"Africa2\",\n" +
//                    "                \"country\": \"Egypt2\"\n" +
//                    "                \n" +
//                    "            }").getBytes());
//            connection.write("12,val12,Asia,India".getBytes());
            connection.commitTransaction();
            // begin another transaction, write more records and commit 2nd transaction
//            connection.beginTransaction();
//            connection.write("13,val13,Europe,Germany".getBytes());
//            connection.write("14,val14,Asia,India".getBytes());
//            connection.commitTransaction();
        } catch (StreamingException e) {
            e.printStackTrace();
        } finally {
            // close the streaming connection
            if(connection != null) {
                connection.close();
            }
        }
    }

    private static void updateHiveConf(HiveConf hiveConf) {
        hiveConf.set("dfs.block.access.token.enable", "true");
        hiveConf.set("dfs.blockreport.initialDelay", "300");
        hiveConf.set("dfs.blocksize", "536870912");
        hiveConf.set("dfs.client.block.write.locateFollowingBlock.retries", "8");
        hiveConf.set("dfs.client.failover.proxy.provider.dev-fdp-hive3-cluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
        hiveConf.set("dfs.client.read.shortcircuit", "true");
        hiveConf.set("dfs.client.read.shortcircuit.streams.cache.size", "200");
        hiveConf.set("dfs.cluster.administrators", "hdfs");
        hiveConf.set("dfs.datanode.address", "0.0.0.0:50010");
        hiveConf.set("dfs.datanode.balance.bandwidthPerSec", "5368709120");
        hiveConf.set("dfs.datanode.data.dir", "/grid/1/dfs/data");
        hiveConf.set("dfs.datanode.data.dir.perm", "750");
        hiveConf.set("dfs.datanode.failed.volumes.tolerated", "0");
        hiveConf.set("dfs.datanode.http.address", "0.0.0.0:50075");
        hiveConf.set("dfs.datanode.https.address", "0.0.0.0:50475");
        hiveConf.set("dfs.datanode.ipc.address", "0.0.0.0:8010");
        hiveConf.set("dfs.datanode.max.transfer.threads", "200");
        hiveConf.set("dfs.domain.socket.path", "/var/lib/hadoop-hdfs/dn_socket");
        hiveConf.set("dfs.ha.automatic-failover.enabled", "true");
        hiveConf.set("dfs.ha.fencing.methods", "shell(/bin/true)");
        hiveConf.set("dfs.ha.namenodes.dev-fdp-hive3-cluster", "nn1,nn2");
        hiveConf.set("dfs.heartbeat.interval", "10");
        hiveConf.set("dfs.hosts.exclude", "/etc/hadoop/conf/dfs.exclude");
        hiveConf.set("dfs.http.policy", "HTTP_ONLY");
        hiveConf.set("dfs.https.port", "50470");
        hiveConf.set("dfs.journalnode.edits.dir", "/grid/1/dfs/journal");
        hiveConf.set("dfs.journalnode.http-address", "0.0.0.0:8480");
        hiveConf.set("dfs.journalnode.https-address", "0.0.0.0:8481");
        hiveConf.set("dfs.namenode.accesstime.precision", "0");
        hiveConf.set("dfs.namenode.avoid.read.stale.datanode", "true");
        hiveConf.set("dfs.namenode.avoid.write.stale.datanode", "true");
        hiveConf.set("dfs.namenode.checkpoint.dir", "/grid/1/dfs/snn");
        hiveConf.set("dfs.namenode.checkpoint.edits.dir", "${dfs.namenode.checkpoint.dir}");
        hiveConf.set("dfs.namenode.checkpoint.period", "1800");
        hiveConf.set("dfs.namenode.checkpoint.txns", "1000000");
        hiveConf.set("dfs.namenode.handler.count", "100");
        hiveConf.set("dfs.namenode.http-address.dev-fdp-hive3-cluster.nn1", "xxxx-0001:50070");
        hiveConf.set("dfs.namenode.http-address.dev-fdp-hive3-cluster.nn2", "xxxx2:50070");
        hiveConf.set("dfs.namenode.https-address.dev-fdp-hive3-cluster.nn1", "xxxx:50470");
        hiveConf.set("dfs.namenode.https-address.dev-fdp-hive3-cluster.nn2", "xxxx2:50470");
        hiveConf.set("dfs.namenode.name.dir", "/grid/1/dfs/nn");
        hiveConf.set("dfs.namenode.name.dir.restore", "true");
        hiveConf.set("dfs.namenode.rpc-address.dev-fdp-hive3-cluster.nn1", "xx-0001:8020");
        hiveConf.set("dfs.namenode.rpc-address.dev-fdp-hive3-cluster.nn2", "xx-0002:8020");
        hiveConf.set("dfs.namenode.safemode.threshold-pct", "0.99f");
        hiveConf.set("dfs.namenode.secondary.http-address", "localhost:50090");
        hiveConf.set("dfs.namenode.service.handler.count", "30");
        hiveConf.set("dfs.namenode.shared.edits.dir", "qjournal://xx-0001:8485;xx:8485;xx-0003:8485/dev-fdp-hive3-cluster");
        hiveConf.set("dfs.namenode.stale.datanode.interval", "30000");
        hiveConf.set("dfs.namenode.startup.delay.block.deletion.sec", "3600");
        hiveConf.set("dfs.namenode.write.stale.datanode.ratio", "1.0f");
        hiveConf.set("dfs.nameservices", "test-cluster");
        hiveConf.set("dfs.permissions.enabled", "true");
        hiveConf.set("dfs.permissions.superusergroup", "hdfs");
        hiveConf.set("dfs.replication", "3");
        hiveConf.set("dfs.replication.max", "10");
        hiveConf.set("dfs.support.append", "true");
        hiveConf.set("dfs.webhdfs.enabled", "true");
        hiveConf.set("fs.permissions.umask-mode", "022");
        hiveConf.set("nfs.dump.dir", "/tmp/.hdfs-nfs");
        hiveConf.set("nfs.exports.allowed.hosts", "* rw");
    }
}

