Repository: hive
Updated Branches:
  refs/heads/master 9945b5d5d -> 2985262b8


http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java
new file mode 100644
index 0000000..53adea8
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.Map;
+
+public class EventHandlerFactory {
+  private EventHandlerFactory() {
+  }
+
+  private static Map<String, Class<? extends EventHandler>> registeredHandlers 
= new HashMap<>();
+
+  static {
+    register(MessageFactory.ADD_PARTITION_EVENT, AddPartitionHandler.class);
+    register(MessageFactory.ALTER_PARTITION_EVENT, 
AlterPartitionHandler.class);
+    register(MessageFactory.ALTER_TABLE_EVENT, AlterTableHandler.class);
+    register(MessageFactory.CREATE_TABLE_EVENT, CreateTableHandler.class);
+    register(MessageFactory.DROP_PARTITION_EVENT, DropPartitionHandler.class);
+    register(MessageFactory.DROP_TABLE_EVENT, DropTableHandler.class);
+    register(MessageFactory.INSERT_EVENT, InsertHandler.class);
+  }
+
+  static void register(String event, Class<? extends EventHandler> 
handlerClazz) {
+    try {
+      Constructor<? extends EventHandler> constructor =
+          handlerClazz.getDeclaredConstructor(NotificationEvent.class);
+      assert constructor != null;
+      assert !Modifier.isPrivate(constructor.getModifiers());
+      registeredHandlers.put(event, handlerClazz);
+    } catch (NoSuchMethodException e) {
+      throw new IllegalArgumentException("handler class: " + 
handlerClazz.getCanonicalName()
+          + " does not have the a constructor with only parameter of type:"
+          + NotificationEvent.class.getCanonicalName(), e);
+    }
+  }
+
+  public static EventHandler handlerFor(NotificationEvent event) {
+    if (registeredHandlers.containsKey(event.getEventType())) {
+      Class<? extends EventHandler> handlerClazz = 
registeredHandlers.get(event.getEventType());
+      try {
+        Constructor<? extends EventHandler> constructor =
+            handlerClazz.getDeclaredConstructor(NotificationEvent.class);
+        return constructor.newInstance(event);
+      } catch (NoSuchMethodException | IllegalAccessException | 
InstantiationException | InvocationTargetException e) {
+        // this should never happen. however we want to make sure we propagate 
the exception
+        throw new RuntimeException(
+            "failed when creating handler for " + event.getEventType()
+                + " with the responsible class being " + 
handlerClazz.getCanonicalName(), e);
+      }
+    }
+    return new DefaultHandler(event);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
new file mode 100644
index 0000000..1346276
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.events;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.thrift.TException;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE;
+import static 
org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData;
+
+public class InsertHandler extends AbstractHandler {
+
+  InsertHandler(NotificationEvent event) {
+    super(event);
+  }
+
+  @Override
+  public void handle(Context withinContext) throws Exception {
+    InsertMessage insertMsg = 
deserializer.getInsertMessage(event.getMessage());
+    org.apache.hadoop.hive.ql.metadata.Table qlMdTable = 
tableObject(withinContext, insertMsg);
+    Map<String, String> partSpec = insertMsg.getPartitionKeyValues();
+    List<Partition> qlPtns = null;
+    if (qlMdTable.isPartitioned() && !partSpec.isEmpty()) {
+      qlPtns = 
Collections.singletonList(withinContext.db.getPartition(qlMdTable, partSpec, 
false));
+    }
+    Path metaDataPath = new Path(withinContext.eventRoot, 
EximUtil.METADATA_NAME);
+    // Mark the replication type as insert into to avoid overwrite while import
+    withinContext.replicationSpec.setIsInsert(true);
+    
EximUtil.createExportDump(metaDataPath.getFileSystem(withinContext.hiveConf), 
metaDataPath,
+        qlMdTable, qlPtns,
+        withinContext.replicationSpec);
+    Iterable<String> files = insertMsg.getFiles();
+
+    if (files != null) {
+      // encoded filename/checksum of files, write into _files
+      try (BufferedWriter fileListWriter = writer(withinContext)) {
+        for (String file : files) {
+          fileListWriter.write(file + "\n");
+        }
+      }
+    }
+
+    LOG.info("Processing#{} INSERT message : {}", fromEventId(), 
event.getMessage());
+    DumpMetaData dmd = withinContext.createDmd(this);
+    dmd.setPayload(event.getMessage());
+    dmd.write();
+  }
+
+  private org.apache.hadoop.hive.ql.metadata.Table tableObject(
+      Context withinContext, InsertMessage insertMsg) throws TException {
+    return new org.apache.hadoop.hive.ql.metadata.Table(
+        withinContext.db.getMSC().getTable(
+            insertMsg.getDB(), insertMsg.getTable()
+        )
+    );
+  }
+
+  private BufferedWriter writer(Context withinContext) throws IOException {
+    Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
+    Path filesPath = new Path(dataPath, EximUtil.FILES_NAME);
+    FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf);
+    return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));
+  }
+
+  @Override
+  public DUMPTYPE dumpType() {
+    return DUMPTYPE.EVENT_INSERT;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java
new file mode 100644
index 0000000..0526700
--- /dev/null
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java
@@ -0,0 +1,44 @@
+package org.apache.hadoop.hive.ql.parse.repl.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestEventHandlerFactory {
+  @Test(expected = IllegalArgumentException.class)
+  public void shouldNotAllowRegisteringEventsWhichCannotBeInstantiated() {
+    class NonCompatibleEventHandler implements EventHandler {
+      @Override
+      public void handle(Context withinContext) throws Exception {
+
+      }
+
+      @Override
+      public long fromEventId() {
+        return 0;
+      }
+
+      @Override
+      public long toEventId() {
+        return 0;
+      }
+
+      @Override
+      public ReplicationSemanticAnalyzer.DUMPTYPE dumpType() {
+        return null;
+      }
+    }
+    EventHandlerFactory.register("anyEvent", NonCompatibleEventHandler.class);
+  }
+
+  @Test
+  public void shouldProvideDefaultHandlerWhenNothingRegisteredForThatEvent() {
+    EventHandler eventHandler =
+        EventHandlerFactory.handlerFor(new NotificationEvent(Long.MAX_VALUE, 
Integer.MAX_VALUE,
+            "shouldGiveDefaultHandler", "s"));
+    assertTrue(eventHandler instanceof DefaultHandler);
+  }
+
+}
\ No newline at end of file

Reply via email to