Repository: hive Updated Branches: refs/heads/master 9945b5d5d -> 2985262b8
http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java new file mode 100644 index 0000000..53adea8 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Modifier; +import java.util.HashMap; +import java.util.Map; + +public class EventHandlerFactory { + private EventHandlerFactory() { + } + + private static Map<String, Class<? extends EventHandler>> registeredHandlers = new HashMap<>(); + + static { + register(MessageFactory.ADD_PARTITION_EVENT, AddPartitionHandler.class); + register(MessageFactory.ALTER_PARTITION_EVENT, AlterPartitionHandler.class); + register(MessageFactory.ALTER_TABLE_EVENT, AlterTableHandler.class); + register(MessageFactory.CREATE_TABLE_EVENT, CreateTableHandler.class); + register(MessageFactory.DROP_PARTITION_EVENT, DropPartitionHandler.class); + register(MessageFactory.DROP_TABLE_EVENT, DropTableHandler.class); + register(MessageFactory.INSERT_EVENT, InsertHandler.class); + } + + static void register(String event, Class<? extends EventHandler> handlerClazz) { + try { + Constructor<? extends EventHandler> constructor = + handlerClazz.getDeclaredConstructor(NotificationEvent.class); + assert constructor != null; + assert !Modifier.isPrivate(constructor.getModifiers()); + registeredHandlers.put(event, handlerClazz); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException("handler class: " + handlerClazz.getCanonicalName() + + " does not have the a constructor with only parameter of type:" + + NotificationEvent.class.getCanonicalName(), e); + } + } + + public static EventHandler handlerFor(NotificationEvent event) { + if (registeredHandlers.containsKey(event.getEventType())) { + Class<? extends EventHandler> handlerClazz = registeredHandlers.get(event.getEventType()); + try { + Constructor<? extends EventHandler> constructor = + handlerClazz.getDeclaredConstructor(NotificationEvent.class); + return constructor.newInstance(event); + } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) { + // this should never happen. however we want to make sure we propagate the exception + throw new RuntimeException( + "failed when creating handler for " + event.getEventType() + + " with the responsible class being " + handlerClazz.getCanonicalName(), e); + } + } + return new DefaultHandler(event); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java new file mode 100644 index 0000000..1346276 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.events; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.InsertMessage; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.thrift.TException; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; +import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData; + +public class InsertHandler extends AbstractHandler { + + InsertHandler(NotificationEvent event) { + super(event); + } + + @Override + public void handle(Context withinContext) throws Exception { + InsertMessage insertMsg = deserializer.getInsertMessage(event.getMessage()); + org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(withinContext, insertMsg); + Map<String, String> partSpec = insertMsg.getPartitionKeyValues(); + List<Partition> qlPtns = null; + if (qlMdTable.isPartitioned() && !partSpec.isEmpty()) { + qlPtns = Collections.singletonList(withinContext.db.getPartition(qlMdTable, partSpec, false)); + } + Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); + // Mark the replication type as insert into to avoid overwrite while import + withinContext.replicationSpec.setIsInsert(true); + EximUtil.createExportDump(metaDataPath.getFileSystem(withinContext.hiveConf), metaDataPath, + qlMdTable, qlPtns, + withinContext.replicationSpec); + Iterable<String> files = insertMsg.getFiles(); + + if (files != null) { + // encoded filename/checksum of files, write into _files + try (BufferedWriter fileListWriter = writer(withinContext)) { + for (String file : files) { + fileListWriter.write(file + "\n"); + } + } + } + + LOG.info("Processing#{} INSERT message : {}", fromEventId(), event.getMessage()); + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); + } + + private org.apache.hadoop.hive.ql.metadata.Table tableObject( + Context withinContext, InsertMessage insertMsg) throws TException { + return new org.apache.hadoop.hive.ql.metadata.Table( + withinContext.db.getMSC().getTable( + insertMsg.getDB(), insertMsg.getTable() + ) + ); + } + + private BufferedWriter writer(Context withinContext) throws IOException { + Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME); + Path filesPath = new Path(dataPath, EximUtil.FILES_NAME); + FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf); + return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); + } + + @Override + public DUMPTYPE dumpType() { + return DUMPTYPE.EVENT_INSERT; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java new file mode 100644 index 0000000..0526700 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java @@ -0,0 +1,44 @@ +package org.apache.hadoop.hive.ql.parse.repl.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +public class TestEventHandlerFactory { + @Test(expected = IllegalArgumentException.class) + public void shouldNotAllowRegisteringEventsWhichCannotBeInstantiated() { + class NonCompatibleEventHandler implements EventHandler { + @Override + public void handle(Context withinContext) throws Exception { + + } + + @Override + public long fromEventId() { + return 0; + } + + @Override + public long toEventId() { + return 0; + } + + @Override + public ReplicationSemanticAnalyzer.DUMPTYPE dumpType() { + return null; + } + } + EventHandlerFactory.register("anyEvent", NonCompatibleEventHandler.class); + } + + @Test + public void shouldProvideDefaultHandlerWhenNothingRegisteredForThatEvent() { + EventHandler eventHandler = + EventHandlerFactory.handlerFor(new NotificationEvent(Long.MAX_VALUE, Integer.MAX_VALUE, + "shouldGiveDefaultHandler", "s")); + assertTrue(eventHandler instanceof DefaultHandler); + } + +} \ No newline at end of file