HIVE-16266 : Enable function metadata to be written during bootstrap (Anishek 
Agarwal, reviewed by Sushanth Sowmyan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2985262b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2985262b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2985262b

Branch: refs/heads/master
Commit: 2985262b895cb00b22935d0ff660a6003082b631
Parents: 9945b5d
Author: Sushanth Sowmyan <khorg...@gmail.com>
Authored: Mon Apr 3 15:30:58 2017 -0700
Committer: Sushanth Sowmyan <khorg...@gmail.com>
Committed: Mon Apr 3 15:32:19 2017 -0700

----------------------------------------------------------------------
 .../hive/ql/TestReplicationScenarios.java       |   8 +-
 .../apache/hadoop/hive/ql/parse/EximUtil.java   | 104 +-----
 .../hive/ql/parse/FunctionSemanticAnalyzer.java |   8 +-
 .../ql/parse/ReplicationSemanticAnalyzer.java   | 355 +++++--------------
 .../hive/ql/parse/repl/dump/DBSerializer.java   |  54 +++
 .../ql/parse/repl/dump/FunctionSerializer.java  |  48 +++
 .../hive/ql/parse/repl/dump/JsonWriter.java     |  54 +++
 .../ql/parse/repl/dump/PartitionSerializer.java |  64 ++++
 .../repl/dump/ReplicationSpecSerializer.java    |  36 ++
 .../ql/parse/repl/dump/TableSerializer.java     | 113 ++++++
 .../repl/dump/VersionCompatibleSerializer.java  |  37 ++
 .../ql/parse/repl/events/AbstractHandler.java   |  46 +++
 .../parse/repl/events/AddPartitionHandler.java  | 114 ++++++
 .../repl/events/AlterPartitionHandler.java      | 103 ++++++
 .../ql/parse/repl/events/AlterTableHandler.java |  92 +++++
 .../parse/repl/events/CreateTableHandler.java   |  86 +++++
 .../ql/parse/repl/events/DefaultHandler.java    |  43 +++
 .../parse/repl/events/DropPartitionHandler.java |  43 +++
 .../ql/parse/repl/events/DropTableHandler.java  |  43 +++
 .../hive/ql/parse/repl/events/EventHandler.java |  62 ++++
 .../parse/repl/events/EventHandlerFactory.java  |  75 ++++
 .../ql/parse/repl/events/InsertHandler.java     |  96 +++++
 .../repl/events/TestEventHandlerFactory.java    |  44 +++
 23 files changed, 1357 insertions(+), 371 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
index 9e79b6a..2688f35 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -1251,8 +1251,6 @@ public class TestReplicationScenarios {
     assertFalse(EventUtils.andFilter(no, yes, no).accept(dummyEvent));
     assertFalse(EventUtils.andFilter(no, no, yes).accept(dummyEvent));
     assertFalse(EventUtils.andFilter(no, no, no).accept(dummyEvent));
-
-
   }
 
   private NotificationEvent createDummyEvent(String dbname, String tblname, 
long evid) {
@@ -1283,7 +1281,7 @@ public class TestReplicationScenarios {
     if (tblName != null){
       verifyRun("REPL STATUS " + dbName + "_dupe." + tblName, lastReplDumpId);
     }
-    assertTrue(lastReplDumpId.compareTo(prevReplDumpId) > 0);
+    assertTrue(Long.parseLong(lastReplDumpId) > 
Long.parseLong(prevReplDumpId));
     return lastReplDumpId;
   }
 
@@ -1298,7 +1296,7 @@ public class TestReplicationScenarios {
     run("REPL LOAD " + dbName + "_dupe." + tblName + " FROM '" + lastDumpLocn 
+ "'");
     verifyRun("REPL STATUS " + dbName + "_dupe", lastDbReplDumpId);
     verifyRun("REPL STATUS " + dbName + "_dupe." + tblName, lastReplDumpId);
-    assertTrue(lastReplDumpId.compareTo(prevReplDumpId) > 0);
+    assertTrue(Long.parseLong(lastReplDumpId) > 
Long.parseLong(prevReplDumpId));
     return lastReplDumpId;
   }
 
@@ -1392,7 +1390,7 @@ public class TestReplicationScenarios {
     return success;
   }
 
-  public static void createTestDataFile(String filename, String[] lines) 
throws IOException {
+  private static void createTestDataFile(String filename, String[] lines) 
throws IOException {
     FileWriter writer = null;
     try {
       File file = new File(filename);

http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index 10cc286..1ea5182 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import com.google.common.base.Function;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -34,26 +33,24 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.parse.repl.dump.DBSerializer;
+import org.apache.hadoop.hive.ql.parse.repl.dump.JsonWriter;
+import org.apache.hadoop.hive.ql.parse.repl.dump.ReplicationSpecSerializer;
+import org.apache.hadoop.hive.ql.parse.repl.dump.TableSerializer;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TJSONProtocol;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonGenerator;
 import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
 
 import javax.annotation.Nullable;
-
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -240,34 +237,16 @@ public class EximUtil {
   /* If null, then the major version number should match */
   public static final String METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION = null;
 
-  public static void createDbExportDump(
-      FileSystem fs, Path metadataPath, Database dbObj,
+  public static void createDbExportDump(FileSystem fs, Path metadataPath, 
Database dbObj,
       ReplicationSpec replicationSpec) throws IOException, SemanticException {
 
     // WARNING NOTE : at this point, createDbExportDump lives only in a world 
where ReplicationSpec is in replication scope
     // If we later make this work for non-repl cases, analysis of this logic 
might become necessary. Also, this is using
     // Replv2 semantics, i.e. with listFiles laziness (no copy at export time)
 
-    OutputStream out = fs.create(metadataPath);
-    JsonGenerator jgen = (new JsonFactory()).createJsonGenerator(out);
-    jgen.writeStartObject();
-    jgen.writeStringField("version",METADATA_FORMAT_VERSION);
-    dbObj.putToParameters(ReplicationSpec.KEY.CURR_STATE_ID.toString(), 
replicationSpec.getCurrentReplicationState());
-
-    if (METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION != null) {
-      
jgen.writeStringField("fcversion",METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION);
-    }
-    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
-    try {
-      jgen.writeStringField("db", serializer.toString(dbObj, "UTF-8"));
-    } catch (TException e) {
-      throw new SemanticException(
-          ErrorMsg.ERROR_SERIALIZE_METASTORE
-              .getMsg(), e);
+    try (JsonWriter jsonWriter = new JsonWriter(fs, metadataPath)) {
+      new DBSerializer(dbObj).writeTo(jsonWriter, replicationSpec);
     }
-
-    jgen.writeEndObject();
-    jgen.close(); // JsonGenerator owns the OutputStream, so it closes it when 
we call close.
   }
 
   public static void createExportDump(FileSystem fs, Path metadataPath,
@@ -283,73 +262,12 @@ public class EximUtil {
       replicationSpec.setNoop(true);
     }
 
-    OutputStream out = fs.create(metadataPath);
-    JsonGenerator jgen = (new JsonFactory()).createJsonGenerator(out);
-    jgen.writeStartObject();
-    jgen.writeStringField("version",METADATA_FORMAT_VERSION);
-    if (METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION != null) {
-      
jgen.writeStringField("fcversion",METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION);
-    }
-
-    if (replicationSpec.isInReplicationScope()){
-      for (ReplicationSpec.KEY key : ReplicationSpec.KEY.values()){
-        String value = replicationSpec.get(key);
-        if (value != null){
-          jgen.writeStringField(key.toString(), value);
-        }
-      }
-      if (tableHandle != null){
-        Table ttable = tableHandle.getTTable();
-        ttable.putToParameters(
-            ReplicationSpec.KEY.CURR_STATE_ID.toString(), 
replicationSpec.getCurrentReplicationState());
-        if ((ttable.getParameters().containsKey("EXTERNAL")) &&
-            (ttable.getParameters().get("EXTERNAL").equalsIgnoreCase("TRUE"))){
-          // Replication destination will not be external - override if set
-          ttable.putToParameters("EXTERNAL","FALSE");
-        }
-        if (ttable.isSetTableType() && 
ttable.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString())){
-          // Replication dest will not be external - override if set
-          ttable.setTableType(TableType.MANAGED_TABLE.toString());
-        }
-      }
-    } else {
-      // ReplicationSpec.KEY scopeKey = ReplicationSpec.KEY.REPL_SCOPE;
-      // write(out, ",\""+ scopeKey.toString() +"\":\"" + 
replicationSpec.get(scopeKey) + "\"");
-      // TODO: if we want to be explicit about this dump not being a 
replication dump, we can
-      // uncomment this else section, but currently unnneeded. Will require a 
lot of golden file
-      // regen if we do so.
-    }
-    if ((tableHandle != null) && (!replicationSpec.isNoop())){
-      TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
-      try {
-        jgen.writeStringField("table", 
serializer.toString(tableHandle.getTTable(), "UTF-8"));
-        jgen.writeFieldName("partitions");
-        jgen.writeStartArray();
-        if (partitions != null) {
-          for (org.apache.hadoop.hive.ql.metadata.Partition partition : 
partitions) {
-            Partition tptn = partition.getTPartition();
-            if (replicationSpec.isInReplicationScope()){
-              tptn.putToParameters(
-                  ReplicationSpec.KEY.CURR_STATE_ID.toString(), 
replicationSpec.getCurrentReplicationState());
-              if ((tptn.getParameters().containsKey("EXTERNAL")) &&
-                  
(tptn.getParameters().get("EXTERNAL").equalsIgnoreCase("TRUE"))){
-                // Replication destination will not be external
-                tptn.putToParameters("EXTERNAL", "FALSE");
-              }
-            }
-            jgen.writeString(serializer.toString(tptn, "UTF-8"));
-            jgen.flush();
-          }
-        }
-        jgen.writeEndArray();
-      } catch (TException e) {
-        throw new SemanticException(
-            ErrorMsg.ERROR_SERIALIZE_METASTORE
-                .getMsg(), e);
+    try (JsonWriter writer = new JsonWriter(fs, metadataPath)) {
+      if (replicationSpec.isInReplicationScope()) {
+        new ReplicationSpecSerializer().writeTo(writer, replicationSpec);
       }
+      new TableSerializer(tableHandle, partitions).writeTo(writer, 
replicationSpec);
     }
-    jgen.writeEndObject();
-    jgen.close(); // JsonGenerator owns the OutputStream, so it closes it when 
we call close.
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java
index 1ec45ee..a21b043 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.parse;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.parquet.Log;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -47,8 +48,8 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils;
  *
  */
 public class FunctionSemanticAnalyzer extends BaseSemanticAnalyzer {
-  private static final Logger LOG = LoggerFactory
-      .getLogger(FunctionSemanticAnalyzer.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(FunctionSemanticAnalyzer.class);
+  private static final Logger SESISON_STATE_LOG= 
LoggerFactory.getLogger("SessionState");
 
   public FunctionSemanticAnalyzer(QueryState queryState) throws 
SemanticException {
     super(queryState);
@@ -80,6 +81,9 @@ public class FunctionSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
     // find any referenced resources
     List<ResourceUri> resources = getResourceList(ast);
+    if (!isTemporaryFunction && resources == null) {
+      SESISON_STATE_LOG.warn("permanent functions created without USING  
clause will not be replicated.");
+    }
 
     CreateFunctionDesc desc =
         new CreateFunctionDesc(functionName, isTemporaryFunction, className, 
resources);

http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index c9967e8..05d7be1 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.parse;
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import com.google.common.primitives.Ints;
-
 import org.antlr.runtime.tree.Tree;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -51,6 +50,10 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.repl.dump.FunctionSerializer;
+import org.apache.hadoop.hive.ql.parse.repl.dump.JsonWriter;
+import org.apache.hadoop.hive.ql.parse.repl.events.EventHandler;
+import org.apache.hadoop.hive.ql.parse.repl.events.EventHandlerFactory;
 import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
 import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
@@ -65,9 +68,10 @@ import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.DataOutputStream;
@@ -86,7 +90,12 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.hadoop.hive.ql.parse.HiveParser.*;
+import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_FROM;
+import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT;
+import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_DUMP;
+import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_LOAD;
+import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_STATUS;
+import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TO;
 
 public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
   // Database name or pattern
@@ -130,7 +139,7 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
   };
 
-  public class DumpMetaData {
+  public static class DumpMetaData {
     // wrapper class for reading and writing metadata about a dump
     // responsible for _dumpmetadata files
 
@@ -142,15 +151,18 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
     private final Path dumpRoot;
     private final Path dumpFile;
+    private final HiveConf hiveConf;
     private Path cmRoot;
 
-    public DumpMetaData(Path dumpRoot) {
+    public DumpMetaData(Path dumpRoot, HiveConf hiveConf) {
       this.dumpRoot = dumpRoot;
+      this.hiveConf = hiveConf;
       dumpFile = new Path(dumpRoot, DUMPMETADATA);
     }
 
-    public DumpMetaData(Path dumpRoot, DUMPTYPE lvl, Long eventFrom, Long 
eventTo, Path cmRoot){
-      this(dumpRoot);
+    public DumpMetaData(Path dumpRoot, DUMPTYPE lvl, Long eventFrom, Long 
eventTo, Path cmRoot,
+        HiveConf hiveConf){
+      this(dumpRoot,hiveConf);
       setDump(lvl, eventFrom, eventTo, cmRoot);
     }
 
@@ -165,7 +177,7 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     public void loadDumpFromFile() throws SemanticException {
       try {
         // read from dumpfile and instantiate self
-        FileSystem fs = dumpFile.getFileSystem(conf);
+        FileSystem fs = dumpFile.getFileSystem(hiveConf);
         BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(dumpFile)));
         String line = null;
         if ( (line = br.readLine()) != null){
@@ -230,8 +242,16 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     }
 
     public void write() throws SemanticException {
-      writeOutput(Arrays.asList(dumpType.toString(), eventFrom.toString(), 
eventTo.toString(),
-          cmRoot.toString(), payload), dumpFile);
+      writeOutput(
+          Arrays.asList(
+              dumpType.toString(),
+              eventFrom.toString(),
+              eventTo.toString(),
+              cmRoot.toString(),
+              payload),
+          dumpFile,
+          hiveConf
+      );
     }
 
   }
@@ -314,7 +334,7 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         + String.valueOf(eventTo) + " maxEventLimit " + 
String.valueOf(maxEventLimit));
     String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR);
     Path dumpRoot = new Path(replRoot, getNextDumpDir());
-    DumpMetaData dmd = new DumpMetaData(dumpRoot);
+    DumpMetaData dmd = new DumpMetaData(dumpRoot, conf);
     Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR));
     Long lastReplId;
     try {
@@ -324,6 +344,7 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         for (String dbName : matchesDb(dbNameOrPattern)) {
           LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: 
" + dbName);
           Path dbRoot = dumpDbMetadata(dbName, dumpRoot);
+          dumpFunctionMetadata(dbName, dumpRoot);
           for (String tblName : matchesTbl(dbName, tblNameOrPattern)) {
             LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping 
table: " + tblName
                 + " to db root " + dbRoot.toUri());
@@ -403,7 +424,7 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         LOG.info("Done dumping events, preparing to return {},{}", 
dumpRoot.toUri(), lastReplId);
         writeOutput(
             Arrays.asList("incremental", String.valueOf(eventFrom), 
String.valueOf(lastReplId)),
-            dmd.getDumpFilePath());
+            dmd.getDumpFilePath(), conf);
         dmd.setDump(DUMPTYPE.INCREMENTAL, eventFrom, lastReplId, cmRoot);
         dmd.write();
       }
@@ -417,254 +438,14 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
   }
 
   private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) 
throws Exception {
-    long evid = ev.getEventId();
-    String evidStr = String.valueOf(evid);
-    ReplicationSpec replicationSpec = getNewEventOnlyReplicationSpec(evidStr);
-    MessageDeserializer md = MessageFactory.getInstance().getDeserializer();
-    switch (ev.getEventType()){
-      case MessageFactory.CREATE_TABLE_EVENT : {
-        CreateTableMessage ctm = md.getCreateTableMessage(ev.getMessage());
-        LOG.info("Processing#{} CREATE_TABLE message : {}", ev.getEventId(), 
ev.getMessage());
-        org.apache.hadoop.hive.metastore.api.Table tobj = ctm.getTableObj();
-
-        if (tobj == null){
-          LOG.debug("Event#{} was a CREATE_TABLE_EVENT with no table listed");
-          break;
-        }
-
-        Table qlMdTable = new Table(tobj);
-        if (qlMdTable.isView()) {
-          replicationSpec.setIsMetadataOnly(true);
-        }
-
-        Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME);
-        EximUtil.createExportDump(
-            metaDataPath.getFileSystem(conf),
-            metaDataPath,
-            qlMdTable,
-            null,
-            replicationSpec);
-
-        Path dataPath = new Path(evRoot, "data");
-        Iterable<String> files = ctm.getFiles();
-        if (files != null) {
-          // encoded filename/checksum of files, write into _files
-          FileSystem fs = dataPath.getFileSystem(conf);
-          Path filesPath = new Path(dataPath, EximUtil.FILES_NAME);
-          BufferedWriter fileListWriter = new BufferedWriter(
-              new OutputStreamWriter(fs.create(filesPath)));
-          try {
-            for (String file : files) {
-              fileListWriter.write(file + "\n");
-            }
-          } finally {
-            fileListWriter.close();
-          }
-        }
-
-        (new DumpMetaData(evRoot, DUMPTYPE.EVENT_CREATE_TABLE, evid, evid, 
cmRoot)).write();
-        break;
-      }
-      case MessageFactory.ADD_PARTITION_EVENT : {
-        AddPartitionMessage apm = md.getAddPartitionMessage(ev.getMessage());
-        LOG.info("Processing#{} ADD_PARTITION message : {}", ev.getEventId(), 
ev.getMessage());
-        Iterable<org.apache.hadoop.hive.metastore.api.Partition> ptns = 
apm.getPartitionObjs();
-        if ((ptns == null) || (!ptns.iterator().hasNext())) {
-          LOG.debug("Event#{} was an ADD_PTN_EVENT with no partitions");
-          break;
-        }
-        org.apache.hadoop.hive.metastore.api.Table tobj = apm.getTableObj();
-        if (tobj == null){
-          LOG.debug("Event#{} was a ADD_PTN_EVENT with no table listed");
-          break;
-        }
-
-        final Table qlMdTable = new Table(tobj);
-        Iterable<Partition> qlPtns = Iterables.transform(
-            ptns,
-            new Function<org.apache.hadoop.hive.metastore.api.Partition, 
Partition>() {
-              @Nullable
-              @Override
-              public Partition apply(@Nullable 
org.apache.hadoop.hive.metastore.api.Partition input) {
-                if (input == null) {
-                  return null;
-                }
-                try {
-                  return new Partition(qlMdTable, input);
-                } catch (HiveException e) {
-                  throw new IllegalArgumentException(e);
-                }
-              }
-            }
-        );
-
-        Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME);
-        EximUtil.createExportDump(
-            metaDataPath.getFileSystem(conf),
-            metaDataPath,
-            qlMdTable,
-            qlPtns,
-            replicationSpec);
-
-        Iterator<PartitionFiles> partitionFilesIter = 
apm.getPartitionFilesIter().iterator();
-        for (Partition qlPtn : qlPtns){
-          PartitionFiles partitionFiles = partitionFilesIter.next();
-          Iterable<String> files = partitionFiles.getFiles();
-          if (files != null) {
-            // encoded filename/checksum of files, write into _files
-            Path ptnDataPath = new Path(evRoot, qlPtn.getName());
-            FileSystem fs = ptnDataPath.getFileSystem(conf);
-            Path filesPath = new Path(ptnDataPath, EximUtil.FILES_NAME);
-            BufferedWriter fileListWriter = new BufferedWriter(
-                new OutputStreamWriter(fs.create(filesPath)));
-            try {
-              for (String file : files) {
-                fileListWriter.write(file + "\n");
-              }
-            } finally {
-              fileListWriter.close();
-            }
-          }
-        }
-
-        (new DumpMetaData(evRoot, DUMPTYPE.EVENT_ADD_PARTITION, evid, evid, 
cmRoot)).write();
-        break;
-      }
-      case MessageFactory.DROP_TABLE_EVENT : {
-        LOG.info("Processing#{} DROP_TABLE message : {}", ev.getEventId(), 
ev.getMessage());
-        DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_DROP_TABLE, 
evid, evid, cmRoot);
-        dmd.setPayload(ev.getMessage());
-        dmd.write();
-        break;
-      }
-      case MessageFactory.DROP_PARTITION_EVENT : {
-        LOG.info("Processing#{} DROP_PARTITION message : {}", ev.getEventId(), 
ev.getMessage());
-        DumpMetaData dmd = new DumpMetaData(evRoot, 
DUMPTYPE.EVENT_DROP_PARTITION, evid, evid, cmRoot);
-        dmd.setPayload(ev.getMessage());
-        dmd.write();
-        break;
-      }
-      case MessageFactory.ALTER_TABLE_EVENT : {
-        LOG.info("Processing#{} ALTER_TABLE message : {}", ev.getEventId(), 
ev.getMessage());
-        AlterTableMessage atm = md.getAlterTableMessage(ev.getMessage());
-        org.apache.hadoop.hive.metastore.api.Table tobjBefore = 
atm.getTableObjBefore();
-        org.apache.hadoop.hive.metastore.api.Table tobjAfter = 
atm.getTableObjAfter();
-
-        if (tobjBefore.getDbName().equals(tobjAfter.getDbName()) &&
-            tobjBefore.getTableName().equals(tobjAfter.getTableName())){
-          // regular alter scenario
-          replicationSpec.setIsMetadataOnly(true);
-          Table qlMdTableAfter = new Table(tobjAfter);
-          Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME);
-          EximUtil.createExportDump(
-              metaDataPath.getFileSystem(conf),
-              metaDataPath,
-              qlMdTableAfter,
-              null,
-              replicationSpec);
-
-          DumpMetaData dmd = new DumpMetaData(evRoot, 
DUMPTYPE.EVENT_ALTER_TABLE, evid, evid, cmRoot);
-          dmd.setPayload(ev.getMessage());
-          dmd.write();
-        } else {
-          // rename scenario
-          DumpMetaData dmd = new DumpMetaData(evRoot, 
DUMPTYPE.EVENT_RENAME_TABLE, evid, evid, cmRoot);
-          dmd.setPayload(ev.getMessage());
-          dmd.write();
-        }
-
-        break;
-      }
-      case MessageFactory.ALTER_PARTITION_EVENT : {
-        LOG.info("Processing#{} ALTER_PARTITION message : {}", 
ev.getEventId(), ev.getMessage());
-        AlterPartitionMessage apm = 
md.getAlterPartitionMessage(ev.getMessage());
-        org.apache.hadoop.hive.metastore.api.Table tblObj = apm.getTableObj();
-        org.apache.hadoop.hive.metastore.api.Partition pobjBefore = 
apm.getPtnObjBefore();
-        org.apache.hadoop.hive.metastore.api.Partition pobjAfter = 
apm.getPtnObjAfter();
-
-        boolean renameScenario = false;
-        Iterator<String> beforeValIter = pobjBefore.getValuesIterator();
-        Iterator<String> afterValIter = pobjAfter.getValuesIterator();
-        for ( ; beforeValIter.hasNext() ; ){
-          if (!beforeValIter.next().equals(afterValIter.next())){
-            renameScenario = true;
-            break;
-          }
-        }
-
-        if (!renameScenario){
-          // regular partition alter
-          replicationSpec.setIsMetadataOnly(true);
-          Table qlMdTable = new Table(tblObj);
-          List<Partition> qlPtns = new ArrayList<Partition>();
-          qlPtns.add(new Partition(qlMdTable, pobjAfter));
-          Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME);
-          EximUtil.createExportDump(
-              metaDataPath.getFileSystem(conf),
-              metaDataPath,
-              qlMdTable,
-              qlPtns,
-              replicationSpec);
-          DumpMetaData dmd = new DumpMetaData(evRoot, 
DUMPTYPE.EVENT_ALTER_PARTITION, evid, evid, cmRoot);
-          dmd.setPayload(ev.getMessage());
-          dmd.write();
-          break;
-        } else {
-          // rename scenario
-          DumpMetaData dmd = new DumpMetaData(evRoot, 
DUMPTYPE.EVENT_RENAME_PARTITION, evid, evid, cmRoot);
-          dmd.setPayload(ev.getMessage());
-          dmd.write();
-          break;
-        }
-      }
-      case MessageFactory.INSERT_EVENT: {
-        InsertMessage insertMsg = md.getInsertMessage(ev.getMessage());
-        String dbName = insertMsg.getDB();
-        String tblName = insertMsg.getTable();
-        org.apache.hadoop.hive.metastore.api.Table tobj = 
db.getMSC().getTable(dbName, tblName);
-        Table qlMdTable = new Table(tobj);
-        Map<String, String> partSpec = insertMsg.getPartitionKeyValues();
-        List<Partition> qlPtns  = null;
-        if (qlMdTable.isPartitioned() && !partSpec.isEmpty()) {
-          qlPtns = Arrays.asList(db.getPartition(qlMdTable, partSpec, false));
-        }
-        Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME);
-        replicationSpec.setIsInsert(true); // Mark the replication type as 
insert into to avoid overwrite while import
-        EximUtil.createExportDump(metaDataPath.getFileSystem(conf), 
metaDataPath, qlMdTable, qlPtns,
-            replicationSpec);
-        Iterable<String> files = insertMsg.getFiles();
-
-        if (files != null) {
-          // encoded filename/checksum of files, write into _files
-          Path dataPath = new Path(evRoot, EximUtil.DATA_PATH_NAME);
-          Path filesPath = new Path(dataPath, EximUtil.FILES_NAME);
-          FileSystem fs = dataPath.getFileSystem(conf);
-          BufferedWriter fileListWriter =
-              new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));
-
-          try {
-            for (String file : files) {
-              fileListWriter.write(file + "\n");
-            }
-          } finally {
-            fileListWriter.close();
-          }
-        }
-
-        LOG.info("Processing#{} INSERT message : {}", ev.getEventId(), 
ev.getMessage());
-        DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_INSERT, 
evid, evid, cmRoot);
-        dmd.setPayload(ev.getMessage());
-        dmd.write();
-        break;
-      }
-      // TODO : handle other event types
-      default:
-        LOG.info("Dummy processing#{} message : {}", ev.getEventId(), 
ev.getMessage());
-        DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_UNKNOWN, 
evid, evid, cmRoot);
-        dmd.setPayload(ev.getMessage());
-        dmd.write();
-        break;
-    }
+    EventHandler.Context context = new EventHandler.Context(
+        evRoot,
+        cmRoot,
+        db,
+        conf,
+        getNewEventOnlyReplicationSpec(ev.getEventId())
+    );
+    EventHandlerFactory.handlerFor(ev).handle(context);
 
   }
 
@@ -712,6 +493,37 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     return dbRoot;
   }
 
+  private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions";
+  private static final String FUNCTION_METADATA_DIR_NAME = "_metadata";
+  private final static Logger SESSION_STATE_LOG = 
LoggerFactory.getLogger("SessionState");
+
+  private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws 
SemanticException {
+    Path functionsRoot = new Path(new Path(dumpRoot, dbName), 
FUNCTIONS_ROOT_DIR_NAME);
+    try {
+      // TODO : This should ideally return the Function Objects and not 
Strings(function names) that should be done by the caller, Look at this 
separately.
+      List<String> functionNames = db.getFunctions(dbName, "*");
+      for (String functionName : functionNames) {
+        org.apache.hadoop.hive.metastore.api.Function function =
+            db.getFunction(dbName, functionName);
+        if (function.getResourceUris().isEmpty()) {
+          SESSION_STATE_LOG.warn(
+              "Not replicating function: " + functionName + " as it seems to 
have been created "
+                  + "without USING clause");
+          continue;
+        }
+
+        Path functionMetadataRoot =
+            new Path(new Path(functionsRoot, functionName), 
FUNCTION_METADATA_DIR_NAME);
+        try (JsonWriter jsonWriter = new 
JsonWriter(functionMetadataRoot.getFileSystem(conf),
+            functionMetadataRoot)) {
+          new FunctionSerializer(function).writeTo(jsonWriter, 
getNewReplicationSpec());
+        }
+      }
+    } catch (Exception e) {
+      throw new SemanticException(e);
+    }
+  }
+
   /**
    *
    * @param ast
@@ -822,7 +634,7 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
       // At this point, all dump dirs should contain a _dumpmetadata file that
       // tells us what is inside that dumpdir.
 
-      DumpMetaData dmd = new DumpMetaData(loadPath);
+      DumpMetaData dmd = new DumpMetaData(loadPath, conf);
 
       boolean evDump = false;
       if (dmd.isIncrementalDump()){
@@ -900,7 +712,7 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
           // entire chain
 
           String locn = dir.getPath().toUri().toString();
-          DumpMetaData eventDmd = new DumpMetaData(new Path(locn));
+          DumpMetaData eventDmd = new DumpMetaData(new Path(locn), conf);
           List<Task<? extends Serializable>> evTasks = analyzeEventLoad(
               dbNameOrPattern, tblNameOrPattern, locn, taskChainTail,
               dbsUpdated, tablesUpdated, eventDmd);
@@ -1020,7 +832,7 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         DropTableDesc dropTableDesc = new DropTableDesc(
             actualDbName + "." + actualTblName,
             null, true, true,
-            
getNewEventOnlyReplicationSpec(String.valueOf(dmd.getEventFrom())));
+            getNewEventOnlyReplicationSpec(dmd.getEventFrom()));
         Task<DDLWork> dropTableTask = TaskFactory.get(new DDLWork(inputs, 
outputs, dropTableDesc), conf);
         if (precursor != null){
           precursor.addDependentTask(dropTableTask);
@@ -1044,7 +856,7 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
             DropTableDesc dropPtnDesc = new DropTableDesc(
                 actualDbName + "." + actualTblName,
                 partSpecs, null, true,
-                
getNewEventOnlyReplicationSpec(String.valueOf(dmd.getEventFrom())));
+                getNewEventOnlyReplicationSpec(dmd.getEventFrom()));
             Task<DDLWork> dropPtnTask =
                 TaskFactory.get(new DDLWork(inputs, outputs, dropPtnDesc), 
conf);
             if (precursor != null) {
@@ -1360,7 +1172,7 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     prepareReturnValues(Collections.singletonList(replLastId), 
"last_repl_id#string");
     setFetchTask(createFetchTask("last_repl_id#string"));
     LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: writing 
repl.last.id={} out to {}",
-        String.valueOf(replLastId), ctx.getResFile());
+        String.valueOf(replLastId), ctx.getResFile(), conf);
   }
 
   private void prepareReturnValues(List<String> values, String schema) throws 
SemanticException {
@@ -1369,14 +1181,15 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
       LOG.debug("    > " + s);
     }
     ctx.setResFile(ctx.getLocalTmpPath());
-    writeOutput(values, ctx.getResFile());
+    writeOutput(values, ctx.getResFile(), conf);
   }
 
-  private void writeOutput(List<String> values, Path outputFile) throws 
SemanticException {
+  private static void writeOutput(List<String> values, Path outputFile, 
HiveConf hiveConf)
+      throws SemanticException {
     FileSystem fs = null;
     DataOutputStream outStream = null;
     try {
-      fs = outputFile.getFileSystem(conf);
+      fs = outputFile.getFileSystem(hiveConf);
       outStream = fs.create(outputFile);
       outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput 
: values.get(0)));
       for (int i = 1; i < values.size(); i++) {
@@ -1407,9 +1220,9 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     return new ReplicationSpec(true, false, evState, objState, false, true, 
false);
   }
 
-  // Use for replication states focussed on event only, where the obj state 
will be the event state
-  private ReplicationSpec getNewEventOnlyReplicationSpec(String evState) 
throws SemanticException {
-    return getNewReplicationSpec(evState, evState);
+  // Use for replication states focused on event only, where the obj state 
will be the event state
+  private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) throws 
SemanticException {
+    return getNewReplicationSpec(eventId.toString(), eventId.toString());
   }
 
   private Iterable<? extends String> matchesTbl(String dbName, String 
tblPattern)

http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/DBSerializer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/DBSerializer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/DBSerializer.java
new file mode 100644
index 0000000..40770de
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/DBSerializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TJSONProtocol;
+
+import java.io.IOException;
+
+public class DBSerializer implements JsonWriter.Serializer {
+  private final Database dbObject;
+
+  public DBSerializer(Database dbObject) {
+    this.dbObject = dbObject;
+  }
+
+  @Override
+  public void writeTo(JsonWriter writer, ReplicationSpec 
additionalPropertiesProvider)
+      throws SemanticException, IOException {
+    dbObject.putToParameters(
+        ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+        additionalPropertiesProvider.getCurrentReplicationState()
+    );
+    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+    try {
+      String value = serializer.toString(dbObject, "UTF-8");
+      writer.jsonGenerator.writeStringField("db", value);
+    } catch (TException e) {
+      throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), 
e);
+    }
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/FunctionSerializer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/FunctionSerializer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/FunctionSerializer.java
new file mode 100644
index 0000000..6b03766
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/FunctionSerializer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump;
+
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TJSONProtocol;
+
+import java.io.IOException;
+
+public class FunctionSerializer implements JsonWriter.Serializer {
+  private Function function;
+
+  public FunctionSerializer(Function function) {
+    this.function = function;
+  }
+
+  @Override
+  public void writeTo(JsonWriter writer, ReplicationSpec 
additionalPropertiesProvider)
+      throws SemanticException, IOException {
+    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+    try {
+      writer.jsonGenerator
+          .writeStringField("function", serializer.toString(function, 
"UTF-8"));
+    } catch (TException e) {
+      throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), 
e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/JsonWriter.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/JsonWriter.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/JsonWriter.java
new file mode 100644
index 0000000..1aa1195
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/JsonWriter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import static org.apache.hadoop.hive.ql.parse.EximUtil.METADATA_FORMAT_VERSION;
+
+public class JsonWriter implements Closeable {
+
+  final JsonGenerator jsonGenerator;
+
+  public JsonWriter(FileSystem fs, Path writePath) throws IOException {
+    OutputStream out = fs.create(writePath);
+    jsonGenerator = new JsonFactory().createJsonGenerator(out);
+    jsonGenerator.writeStartObject();
+    jsonGenerator.writeStringField("version", METADATA_FORMAT_VERSION);
+  }
+
+  @Override
+  public void close() throws IOException {
+    jsonGenerator.writeEndObject();
+    jsonGenerator.close();
+  }
+
+  public interface Serializer {
+    void writeTo(JsonWriter writer, ReplicationSpec 
additionalPropertiesProvider) throws
+        SemanticException, IOException;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionSerializer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionSerializer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionSerializer.java
new file mode 100644
index 0000000..313d108
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionSerializer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump;
+
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TJSONProtocol;
+
+import java.io.IOException;
+import java.util.Map;
+
+class PartitionSerializer implements JsonWriter.Serializer {
+  private Partition partition;
+
+  PartitionSerializer(Partition partition) {
+    this.partition = partition;
+  }
+
+  @Override
+  public void writeTo(JsonWriter writer, ReplicationSpec 
additionalPropertiesProvider)
+      throws SemanticException, IOException {
+    TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+    try {
+      if (additionalPropertiesProvider.isInReplicationScope()) {
+        partition.putToParameters(
+            ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+            additionalPropertiesProvider.getCurrentReplicationState());
+        if (isPartitionExternal()) {
+          // Replication destination will not be external
+          partition.putToParameters("EXTERNAL", "FALSE");
+        }
+      }
+      writer.jsonGenerator.writeString(serializer.toString(partition, 
"UTF-8"));
+      writer.jsonGenerator.flush();
+    } catch (TException e) {
+      throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), 
e);
+    }
+  }
+
+  private boolean isPartitionExternal() {
+    Map<String, String> params = partition.getParameters();
+    return params.containsKey("EXTERNAL")
+        && params.get("EXTERNAL").equalsIgnoreCase("TRUE");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/ReplicationSpecSerializer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/ReplicationSpecSerializer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/ReplicationSpecSerializer.java
new file mode 100644
index 0000000..d88a553
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/ReplicationSpecSerializer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump;
+
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import java.io.IOException;
+
+public class ReplicationSpecSerializer implements JsonWriter.Serializer {
+  @Override
+  public void writeTo(JsonWriter writer, ReplicationSpec 
additionalPropertiesProvider)
+      throws SemanticException, IOException {
+    for (ReplicationSpec.KEY key : ReplicationSpec.KEY.values()) {
+      String value = additionalPropertiesProvider.get(key);
+      if (value != null) {
+        writer.jsonGenerator.writeStringField(key.toString(), value);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableSerializer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableSerializer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableSerializer.java
new file mode 100644
index 0000000..a2e258f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableSerializer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump;
+
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TJSONProtocol;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class TableSerializer implements JsonWriter.Serializer {
+  private final org.apache.hadoop.hive.ql.metadata.Table tableHandle;
+  private final Iterable<Partition> partitions;
+
+  public TableSerializer(org.apache.hadoop.hive.ql.metadata.Table tableHandle,
+      Iterable<Partition> partitions) {
+    this.tableHandle = tableHandle;
+    this.partitions = partitions;
+  }
+
+  @Override
+  public void writeTo(JsonWriter writer, ReplicationSpec 
additionalPropertiesProvider)
+      throws SemanticException, IOException {
+    if (cannotReplicateTable(additionalPropertiesProvider)) {
+      return;
+    }
+
+    Table tTable = tableHandle.getTTable();
+    tTable = addPropertiesToTable(tTable, additionalPropertiesProvider);
+    try {
+      TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+      writer.jsonGenerator
+          .writeStringField("table", serializer.toString(tTable, "UTF-8"));
+      writer.jsonGenerator.writeFieldName("partitions");
+      writePartitions(writer, additionalPropertiesProvider);
+    } catch (TException e) {
+      throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), 
e);
+    }
+  }
+
+  private boolean cannotReplicateTable(ReplicationSpec 
additionalPropertiesProvider) {
+    return tableHandle == null || additionalPropertiesProvider.isNoop();
+  }
+
+  private Table addPropertiesToTable(Table table, ReplicationSpec 
additionalPropertiesProvider)
+      throws SemanticException, IOException {
+    if (additionalPropertiesProvider.isInReplicationScope()) {
+      table.putToParameters(
+            ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+            additionalPropertiesProvider.getCurrentReplicationState());
+      if (isExternalTable(table)) {
+          // Replication destination will not be external - override if set
+        table.putToParameters("EXTERNAL", "FALSE");
+        }
+      if (isExternalTableType(table)) {
+          // Replication dest will not be external - override if set
+        table.setTableType(TableType.MANAGED_TABLE.toString());
+        }
+    } else {
+      // ReplicationSpec.KEY scopeKey = ReplicationSpec.KEY.REPL_SCOPE;
+      // write(out, ",\""+ scopeKey.toString() +"\":\"" + 
replicationSpec.get(scopeKey) + "\"");
+      // TODO: if we want to be explicit about this dump not being a 
replication dump, we can
+      // uncomment this else section, but currently unnneeded. Will require a 
lot of golden file
+      // regen if we do so.
+    }
+    return table;
+  }
+
+  private boolean 
isExternalTableType(org.apache.hadoop.hive.metastore.api.Table table) {
+    return table.isSetTableType()
+        && 
table.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString());
+  }
+
+  private boolean isExternalTable(org.apache.hadoop.hive.metastore.api.Table 
table) {
+    Map<String, String> params = table.getParameters();
+    return params.containsKey("EXTERNAL")
+        && params.get("EXTERNAL").equalsIgnoreCase("TRUE");
+  }
+
+  private void writePartitions(JsonWriter writer, ReplicationSpec 
additionalPropertiesProvider)
+      throws SemanticException, IOException {
+    writer.jsonGenerator.writeStartArray();
+    if (partitions != null) {
+      for (org.apache.hadoop.hive.ql.metadata.Partition partition : 
partitions) {
+        new PartitionSerializer(partition.getTPartition())
+            .writeTo(writer, additionalPropertiesProvider);
+      }
+    }
+    writer.jsonGenerator.writeEndArray();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/VersionCompatibleSerializer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/VersionCompatibleSerializer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/VersionCompatibleSerializer.java
new file mode 100644
index 0000000..3ebc803
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/VersionCompatibleSerializer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump;
+
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import java.io.IOException;
+
+import static 
org.apache.hadoop.hive.ql.parse.EximUtil.METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION;
+
+/**
+ * This is not used as of now as the conditional which lead to its usage is 
always false
+ * hence we have removed the conditional and the usage of this class, but 
might be required in future.
+ */
+public class VersionCompatibleSerializer implements JsonWriter.Serializer {
+  @Override
+  public void writeTo(JsonWriter writer, ReplicationSpec 
additionalPropertiesProvider)
+      throws SemanticException, IOException {
+    writer.jsonGenerator.writeStringField("fcversion", 
METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java
new file mode 100644
index 0000000..ab059c2
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractHandler implements EventHandler {
+  static final Logger LOG = LoggerFactory.getLogger(AbstractHandler.class);
+
+  final NotificationEvent event;
+  final MessageDeserializer deserializer;
+
+  AbstractHandler(NotificationEvent event) {
+    this.event = event;
+    deserializer = MessageFactory.getInstance().getDeserializer();
+  }
+
+  @Override
+  public long fromEventId() {
+    return event.getEventId();
+  }
+
+  @Override
+  public long toEventId() {
+    return event.getEventId();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java
new file mode 100644
index 0000000..9a4f8b9
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.events;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+
+import javax.annotation.Nullable;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.Iterator;
+
+import static 
org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
+
+public class AddPartitionHandler extends AbstractHandler {
+  protected AddPartitionHandler(NotificationEvent notificationEvent) {
+    super(notificationEvent);
+  }
+
+  @Override
+  public void handle(Context withinContext) throws Exception {
+    AddPartitionMessage apm = 
deserializer.getAddPartitionMessage(event.getMessage());
+    LOG.info("Processing#{} ADD_PARTITION message : {}", fromEventId(), 
event.getMessage());
+    Iterable<org.apache.hadoop.hive.metastore.api.Partition> ptns = 
apm.getPartitionObjs();
+    if ((ptns == null) || (!ptns.iterator().hasNext())) {
+      LOG.debug("Event#{} was an ADD_PTN_EVENT with no partitions");
+      return;
+    }
+    org.apache.hadoop.hive.metastore.api.Table tobj = apm.getTableObj();
+    if (tobj == null) {
+      LOG.debug("Event#{} was a ADD_PTN_EVENT with no table listed");
+      return;
+    }
+
+    final Table qlMdTable = new Table(tobj);
+    Iterable<Partition> qlPtns = Iterables.transform(
+        ptns,
+        new Function<org.apache.hadoop.hive.metastore.api.Partition, 
Partition>() {
+          @Nullable
+          @Override
+          public Partition apply(@Nullable 
org.apache.hadoop.hive.metastore.api.Partition input) {
+            if (input == null) {
+              return null;
+            }
+            try {
+              return new Partition(qlMdTable, input);
+            } catch (HiveException e) {
+              throw new IllegalArgumentException(e);
+            }
+          }
+        }
+    );
+
+    Path metaDataPath = new Path(withinContext.eventRoot, 
EximUtil.METADATA_NAME);
+    EximUtil.createExportDump(
+        metaDataPath.getFileSystem(withinContext.hiveConf),
+        metaDataPath,
+        qlMdTable,
+        qlPtns,
+        withinContext.replicationSpec);
+
+    Iterator<PartitionFiles> partitionFilesIter = 
apm.getPartitionFilesIter().iterator();
+    for (Partition qlPtn : qlPtns) {
+      Iterable<String> files = partitionFilesIter.next().getFiles();
+      if (files != null) {
+        // encoded filename/checksum of files, write into _files
+        try (BufferedWriter fileListWriter = writer(withinContext, qlPtn)) {
+          for (String file : files) {
+            fileListWriter.write(file + "\n");
+          }
+        }
+      }
+    }
+    withinContext.createDmd(this).write();
+  }
+
+  private BufferedWriter writer(Context withinContext, Partition qlPtn)
+      throws IOException {
+    Path ptnDataPath = new Path(withinContext.eventRoot, qlPtn.getName());
+    FileSystem fs = ptnDataPath.getFileSystem(withinContext.hiveConf);
+    Path filesPath = new Path(ptnDataPath, EximUtil.FILES_NAME);
+    return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));
+  }
+
+  @Override
+  public DUMPTYPE dumpType() {
+    return DUMPTYPE.EVENT_ADD_PARTITION;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java
new file mode 100644
index 0000000..1073cd0
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.events;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static 
org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
+import static 
org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData;
+
+public class AlterPartitionHandler extends AbstractHandler {
+  private final org.apache.hadoop.hive.metastore.api.Partition after;
+  private final org.apache.hadoop.hive.metastore.api.Table tableObject;
+  private final Scenario scenario;
+
+  AlterPartitionHandler(NotificationEvent event) throws Exception {
+    super(event);
+    AlterPartitionMessage apm = 
deserializer.getAlterPartitionMessage(event.getMessage());
+    tableObject = apm.getTableObj();
+    org.apache.hadoop.hive.metastore.api.Partition before = 
apm.getPtnObjBefore();
+    after = apm.getPtnObjAfter();
+    scenario = scenarioType(before, after);
+  }
+
+  private enum Scenario {
+    ALTER {
+      @Override
+      DUMPTYPE dumpType() {
+        return DUMPTYPE.EVENT_ALTER_PARTITION;
+      }
+    },
+    RENAME {
+      @Override
+      DUMPTYPE dumpType() {
+        return DUMPTYPE.EVENT_RENAME_PARTITION;
+      }
+    };
+
+    abstract DUMPTYPE dumpType();
+  }
+
+  private static Scenario 
scenarioType(org.apache.hadoop.hive.metastore.api.Partition before,
+      org.apache.hadoop.hive.metastore.api.Partition after) {
+    Iterator<String> beforeValIter = before.getValuesIterator();
+    Iterator<String> afterValIter = after.getValuesIterator();
+    while(beforeValIter.hasNext()) {
+      if (!beforeValIter.next().equals(afterValIter.next())) {
+        return Scenario.RENAME;
+      }
+    }
+    return Scenario.ALTER;
+  }
+
+  @Override
+  public void handle(Context withinContext) throws Exception {
+    LOG.info("Processing#{} ALTER_PARTITION message : {}", fromEventId(), 
event.getMessage());
+
+    if (Scenario.ALTER == scenario) {
+      withinContext.replicationSpec.setIsMetadataOnly(true);
+      Table qlMdTable = new Table(tableObject);
+      List<Partition> qlPtns = new ArrayList<>();
+      qlPtns.add(new Partition(qlMdTable, after));
+      Path metaDataPath = new Path(withinContext.eventRoot, 
EximUtil.METADATA_NAME);
+      EximUtil.createExportDump(
+          metaDataPath.getFileSystem(withinContext.hiveConf),
+          metaDataPath,
+          qlMdTable,
+          qlPtns,
+          withinContext.replicationSpec);
+    }
+    DumpMetaData dmd = withinContext.createDmd(this);
+    dmd.setPayload(event.getMessage());
+    dmd.write();
+  }
+
+  @Override
+  public DUMPTYPE dumpType() {
+    return scenario.dumpType();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java
new file mode 100644
index 0000000..04d9d79
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.events;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+
+import static 
org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
+import static 
org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData;
+
+public class AlterTableHandler extends AbstractHandler {
+  private final org.apache.hadoop.hive.metastore.api.Table before;
+  private final org.apache.hadoop.hive.metastore.api.Table after;
+  private final Scenario scenario;
+
+  private enum Scenario {
+    ALTER {
+      @Override
+      DUMPTYPE dumpType() {
+        return DUMPTYPE.EVENT_ALTER_TABLE;
+      }
+    },
+    RENAME {
+      @Override
+      DUMPTYPE dumpType() {
+        return DUMPTYPE.EVENT_RENAME_TABLE;
+      }
+    };
+
+    abstract DUMPTYPE dumpType();
+  }
+
+  AlterTableHandler(NotificationEvent event) throws Exception {
+    super(event);
+    AlterTableMessage atm = 
deserializer.getAlterTableMessage(event.getMessage());
+    before = atm.getTableObjBefore();
+    after = atm.getTableObjAfter();
+    scenario = scenarioType(before, after);
+  }
+
+  private static Scenario 
scenarioType(org.apache.hadoop.hive.metastore.api.Table before,
+      org.apache.hadoop.hive.metastore.api.Table after) {
+    return before.getDbName().equals(after.getDbName())
+        && before.getTableName().equals(after.getTableName())
+        ? Scenario.ALTER
+        : Scenario.RENAME;
+  }
+
+  @Override
+  public void handle(Context withinContext) throws Exception {
+    {
+      LOG.info("Processing#{} ALTER_TABLE message : {}", fromEventId(), 
event.getMessage());
+      if (Scenario.ALTER == scenario) {
+        withinContext.replicationSpec.setIsMetadataOnly(true);
+        Table qlMdTableAfter = new Table(after);
+        Path metaDataPath = new Path(withinContext.eventRoot, 
EximUtil.METADATA_NAME);
+        EximUtil.createExportDump(
+            metaDataPath.getFileSystem(withinContext.hiveConf),
+            metaDataPath,
+            qlMdTableAfter,
+            null,
+            withinContext.replicationSpec);
+      }
+      DumpMetaData dmd = withinContext.createDmd(this);
+      dmd.setPayload(event.getMessage());
+      dmd.write();
+    }
+  }
+
+  @Override
+  public DUMPTYPE dumpType() {
+    return scenario.dumpType();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java
new file mode 100644
index 0000000..03f400d
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.events;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+import static 
org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
+
+public class CreateTableHandler extends AbstractHandler {
+
+  CreateTableHandler(NotificationEvent event) {
+    super(event);
+  }
+
+  @Override
+  public void handle(Context withinContext) throws Exception {
+    CreateTableMessage ctm = 
deserializer.getCreateTableMessage(event.getMessage());
+    LOG.info("Processing#{} CREATE_TABLE message : {}", fromEventId(), 
event.getMessage());
+    org.apache.hadoop.hive.metastore.api.Table tobj = ctm.getTableObj();
+
+    if (tobj == null) {
+      LOG.debug("Event#{} was a CREATE_TABLE_EVENT with no table listed");
+      return;
+    }
+
+    Table qlMdTable = new Table(tobj);
+    if (qlMdTable.isView()) {
+      withinContext.replicationSpec.setIsMetadataOnly(true);
+    }
+
+    Path metaDataPath = new Path(withinContext.eventRoot, 
EximUtil.METADATA_NAME);
+    EximUtil.createExportDump(
+        metaDataPath.getFileSystem(withinContext.hiveConf),
+        metaDataPath,
+        qlMdTable,
+        null,
+        withinContext.replicationSpec);
+
+    Path dataPath = new Path(withinContext.eventRoot, "data");
+    Iterable<String> files = ctm.getFiles();
+    if (files != null) {
+      // encoded filename/checksum of files, write into _files
+      try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
+        for (String file : files) {
+          fileListWriter.write(file + "\n");
+        }
+      }
+    }
+    withinContext.createDmd(this).write();
+  }
+
+  private BufferedWriter writer(Context withinContext, Path dataPath) throws 
IOException {
+    FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf);
+    Path filesPath = new Path(dataPath, EximUtil.FILES_NAME);
+    return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));
+  }
+
+  @Override
+  public DUMPTYPE dumpType() {
+    return DUMPTYPE.EVENT_CREATE_TABLE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java
new file mode 100644
index 0000000..61c5f37
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+import static 
org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
+import static 
org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData;
+
+public class DefaultHandler extends AbstractHandler {
+
+  DefaultHandler(NotificationEvent event) {
+    super(event);
+  }
+
+  @Override
+  public void handle(Context withinContext) throws Exception {
+    LOG.info("Dummy processing#{} message : {}", fromEventId(), 
event.getMessage());
+    DumpMetaData dmd = withinContext.createDmd(this);
+    dmd.setPayload(event.getMessage());
+    dmd.write();
+  }
+
+  @Override
+  public DUMPTYPE dumpType() {
+    return DUMPTYPE.EVENT_UNKNOWN;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java
new file mode 100644
index 0000000..3ad794e
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+import static 
org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
+import static 
org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData;
+
+public class DropPartitionHandler extends AbstractHandler {
+
+  DropPartitionHandler(NotificationEvent event) {
+    super(event);
+  }
+
+  @Override
+  public void handle(Context withinContext) throws Exception {
+    LOG.info("Processing#{} DROP_PARTITION message : {}", fromEventId(), 
event.getMessage());
+    DumpMetaData dmd = withinContext.createDmd(this);
+    dmd.setPayload(event.getMessage());
+    dmd.write();
+  }
+
+  @Override
+  public DUMPTYPE dumpType() {
+    return DUMPTYPE.EVENT_DROP_PARTITION;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java
new file mode 100644
index 0000000..cae379b
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+import static 
org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
+import static 
org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData;
+
+public class DropTableHandler extends AbstractHandler {
+
+  DropTableHandler(NotificationEvent event) {
+    super(event);
+  }
+
+  @Override
+  public void handle(Context withinContext) throws Exception {
+    LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), 
event.getMessage());
+    DumpMetaData dmd = withinContext.createDmd(this);
+    dmd.setPayload(event.getMessage());
+    dmd.write();
+  }
+
+  @Override
+  public DUMPTYPE dumpType() {
+    return DUMPTYPE.EVENT_DROP_TABLE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java
new file mode 100644
index 0000000..199145a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.events;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+
+import static 
org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData;
+import static 
org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
+
+public interface EventHandler {
+  void handle(Context withinContext) throws Exception;
+
+  long fromEventId();
+
+  long toEventId();
+
+  DUMPTYPE dumpType();
+
+  class Context {
+    final Path eventRoot, cmRoot;
+    final Hive db;
+    final HiveConf hiveConf;
+    final ReplicationSpec replicationSpec;
+
+    public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf,
+        ReplicationSpec replicationSpec) {
+      this.eventRoot = eventRoot;
+      this.cmRoot = cmRoot;
+      this.db = db;
+      this.hiveConf = hiveConf;
+      this.replicationSpec = replicationSpec;
+    }
+
+    DumpMetaData createDmd(EventHandler eventHandler) {
+      return new DumpMetaData(
+          eventRoot,
+          eventHandler.dumpType(),
+          eventHandler.fromEventId(),
+          eventHandler.toEventId(),
+          cmRoot, hiveConf
+      );
+    }
+  }
+}

Reply via email to