HIVE-16266 : Enable function metadata to be written during bootstrap (Anishek Agarwal, reviewed by Sushanth Sowmyan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2985262b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2985262b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2985262b Branch: refs/heads/master Commit: 2985262b895cb00b22935d0ff660a6003082b631 Parents: 9945b5d Author: Sushanth Sowmyan <khorg...@gmail.com> Authored: Mon Apr 3 15:30:58 2017 -0700 Committer: Sushanth Sowmyan <khorg...@gmail.com> Committed: Mon Apr 3 15:32:19 2017 -0700 ---------------------------------------------------------------------- .../hive/ql/TestReplicationScenarios.java | 8 +- .../apache/hadoop/hive/ql/parse/EximUtil.java | 104 +----- .../hive/ql/parse/FunctionSemanticAnalyzer.java | 8 +- .../ql/parse/ReplicationSemanticAnalyzer.java | 355 +++++-------------- .../hive/ql/parse/repl/dump/DBSerializer.java | 54 +++ .../ql/parse/repl/dump/FunctionSerializer.java | 48 +++ .../hive/ql/parse/repl/dump/JsonWriter.java | 54 +++ .../ql/parse/repl/dump/PartitionSerializer.java | 64 ++++ .../repl/dump/ReplicationSpecSerializer.java | 36 ++ .../ql/parse/repl/dump/TableSerializer.java | 113 ++++++ .../repl/dump/VersionCompatibleSerializer.java | 37 ++ .../ql/parse/repl/events/AbstractHandler.java | 46 +++ .../parse/repl/events/AddPartitionHandler.java | 114 ++++++ .../repl/events/AlterPartitionHandler.java | 103 ++++++ .../ql/parse/repl/events/AlterTableHandler.java | 92 +++++ .../parse/repl/events/CreateTableHandler.java | 86 +++++ .../ql/parse/repl/events/DefaultHandler.java | 43 +++ .../parse/repl/events/DropPartitionHandler.java | 43 +++ .../ql/parse/repl/events/DropTableHandler.java | 43 +++ .../hive/ql/parse/repl/events/EventHandler.java | 62 ++++ .../parse/repl/events/EventHandlerFactory.java | 75 ++++ .../ql/parse/repl/events/InsertHandler.java | 96 +++++ .../repl/events/TestEventHandlerFactory.java | 44 +++ 23 files changed, 1357 insertions(+), 371 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java index 9e79b6a..2688f35 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java @@ -1251,8 +1251,6 @@ public class TestReplicationScenarios { assertFalse(EventUtils.andFilter(no, yes, no).accept(dummyEvent)); assertFalse(EventUtils.andFilter(no, no, yes).accept(dummyEvent)); assertFalse(EventUtils.andFilter(no, no, no).accept(dummyEvent)); - - } private NotificationEvent createDummyEvent(String dbname, String tblname, long evid) { @@ -1283,7 +1281,7 @@ public class TestReplicationScenarios { if (tblName != null){ verifyRun("REPL STATUS " + dbName + "_dupe." + tblName, lastReplDumpId); } - assertTrue(lastReplDumpId.compareTo(prevReplDumpId) > 0); + assertTrue(Long.parseLong(lastReplDumpId) > Long.parseLong(prevReplDumpId)); return lastReplDumpId; } @@ -1298,7 +1296,7 @@ public class TestReplicationScenarios { run("REPL LOAD " + dbName + "_dupe." + tblName + " FROM '" + lastDumpLocn + "'"); verifyRun("REPL STATUS " + dbName + "_dupe", lastDbReplDumpId); verifyRun("REPL STATUS " + dbName + "_dupe." + tblName, lastReplDumpId); - assertTrue(lastReplDumpId.compareTo(prevReplDumpId) > 0); + assertTrue(Long.parseLong(lastReplDumpId) > Long.parseLong(prevReplDumpId)); return lastReplDumpId; } @@ -1392,7 +1390,7 @@ public class TestReplicationScenarios { return success; } - public static void createTestDataFile(String filename, String[] lines) throws IOException { + private static void createTestDataFile(String filename, String[] lines) throws IOException { FileWriter writer = null; try { File file = new File(filename); http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 10cc286..1ea5182 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.parse; import com.google.common.base.Function; - import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.metastore.api.Database; @@ -34,26 +33,24 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.parse.repl.dump.DBSerializer; +import org.apache.hadoop.hive.ql.parse.repl.dump.JsonWriter; +import org.apache.hadoop.hive.ql.parse.repl.dump.ReplicationSpecSerializer; +import org.apache.hadoop.hive.ql.parse.repl.dump.TableSerializer; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; -import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TJSONProtocol; -import org.codehaus.jackson.JsonFactory; -import org.codehaus.jackson.JsonGenerator; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; import javax.annotation.Nullable; - import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.OutputStream; import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; @@ -240,34 +237,16 @@ public class EximUtil { /* If null, then the major version number should match */ public static final String METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION = null; - public static void createDbExportDump( - FileSystem fs, Path metadataPath, Database dbObj, + public static void createDbExportDump(FileSystem fs, Path metadataPath, Database dbObj, ReplicationSpec replicationSpec) throws IOException, SemanticException { // WARNING NOTE : at this point, createDbExportDump lives only in a world where ReplicationSpec is in replication scope // If we later make this work for non-repl cases, analysis of this logic might become necessary. Also, this is using // Replv2 semantics, i.e. with listFiles laziness (no copy at export time) - OutputStream out = fs.create(metadataPath); - JsonGenerator jgen = (new JsonFactory()).createJsonGenerator(out); - jgen.writeStartObject(); - jgen.writeStringField("version",METADATA_FORMAT_VERSION); - dbObj.putToParameters(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replicationSpec.getCurrentReplicationState()); - - if (METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION != null) { - jgen.writeStringField("fcversion",METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION); - } - TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); - try { - jgen.writeStringField("db", serializer.toString(dbObj, "UTF-8")); - } catch (TException e) { - throw new SemanticException( - ErrorMsg.ERROR_SERIALIZE_METASTORE - .getMsg(), e); + try (JsonWriter jsonWriter = new JsonWriter(fs, metadataPath)) { + new DBSerializer(dbObj).writeTo(jsonWriter, replicationSpec); } - - jgen.writeEndObject(); - jgen.close(); // JsonGenerator owns the OutputStream, so it closes it when we call close. } public static void createExportDump(FileSystem fs, Path metadataPath, @@ -283,73 +262,12 @@ public class EximUtil { replicationSpec.setNoop(true); } - OutputStream out = fs.create(metadataPath); - JsonGenerator jgen = (new JsonFactory()).createJsonGenerator(out); - jgen.writeStartObject(); - jgen.writeStringField("version",METADATA_FORMAT_VERSION); - if (METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION != null) { - jgen.writeStringField("fcversion",METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION); - } - - if (replicationSpec.isInReplicationScope()){ - for (ReplicationSpec.KEY key : ReplicationSpec.KEY.values()){ - String value = replicationSpec.get(key); - if (value != null){ - jgen.writeStringField(key.toString(), value); - } - } - if (tableHandle != null){ - Table ttable = tableHandle.getTTable(); - ttable.putToParameters( - ReplicationSpec.KEY.CURR_STATE_ID.toString(), replicationSpec.getCurrentReplicationState()); - if ((ttable.getParameters().containsKey("EXTERNAL")) && - (ttable.getParameters().get("EXTERNAL").equalsIgnoreCase("TRUE"))){ - // Replication destination will not be external - override if set - ttable.putToParameters("EXTERNAL","FALSE"); - } - if (ttable.isSetTableType() && ttable.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString())){ - // Replication dest will not be external - override if set - ttable.setTableType(TableType.MANAGED_TABLE.toString()); - } - } - } else { - // ReplicationSpec.KEY scopeKey = ReplicationSpec.KEY.REPL_SCOPE; - // write(out, ",\""+ scopeKey.toString() +"\":\"" + replicationSpec.get(scopeKey) + "\""); - // TODO: if we want to be explicit about this dump not being a replication dump, we can - // uncomment this else section, but currently unnneeded. Will require a lot of golden file - // regen if we do so. - } - if ((tableHandle != null) && (!replicationSpec.isNoop())){ - TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); - try { - jgen.writeStringField("table", serializer.toString(tableHandle.getTTable(), "UTF-8")); - jgen.writeFieldName("partitions"); - jgen.writeStartArray(); - if (partitions != null) { - for (org.apache.hadoop.hive.ql.metadata.Partition partition : partitions) { - Partition tptn = partition.getTPartition(); - if (replicationSpec.isInReplicationScope()){ - tptn.putToParameters( - ReplicationSpec.KEY.CURR_STATE_ID.toString(), replicationSpec.getCurrentReplicationState()); - if ((tptn.getParameters().containsKey("EXTERNAL")) && - (tptn.getParameters().get("EXTERNAL").equalsIgnoreCase("TRUE"))){ - // Replication destination will not be external - tptn.putToParameters("EXTERNAL", "FALSE"); - } - } - jgen.writeString(serializer.toString(tptn, "UTF-8")); - jgen.flush(); - } - } - jgen.writeEndArray(); - } catch (TException e) { - throw new SemanticException( - ErrorMsg.ERROR_SERIALIZE_METASTORE - .getMsg(), e); + try (JsonWriter writer = new JsonWriter(fs, metadataPath)) { + if (replicationSpec.isInReplicationScope()) { + new ReplicationSpecSerializer().writeTo(writer, replicationSpec); } + new TableSerializer(tableHandle, partitions).writeTo(writer, replicationSpec); } - jgen.writeEndObject(); - jgen.close(); // JsonGenerator owns the OutputStream, so it closes it when we call close. } /** http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java index 1ec45ee..a21b043 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/FunctionSemanticAnalyzer.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.parse; import java.util.ArrayList; import java.util.List; +import org.apache.parquet.Log; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -47,8 +48,8 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; * */ public class FunctionSemanticAnalyzer extends BaseSemanticAnalyzer { - private static final Logger LOG = LoggerFactory - .getLogger(FunctionSemanticAnalyzer.class); + private static final Logger LOG = LoggerFactory.getLogger(FunctionSemanticAnalyzer.class); + private static final Logger SESISON_STATE_LOG= LoggerFactory.getLogger("SessionState"); public FunctionSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); @@ -80,6 +81,9 @@ public class FunctionSemanticAnalyzer extends BaseSemanticAnalyzer { // find any referenced resources List<ResourceUri> resources = getResourceList(ast); + if (!isTemporaryFunction && resources == null) { + SESISON_STATE_LOG.warn("permanent functions created without USING clause will not be replicated."); + } CreateFunctionDesc desc = new CreateFunctionDesc(functionName, isTemporaryFunction, className, resources); http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index c9967e8..05d7be1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.parse; import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.primitives.Ints; - import org.antlr.runtime.tree.Tree; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -51,6 +50,10 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.repl.dump.FunctionSerializer; +import org.apache.hadoop.hive.ql.parse.repl.dump.JsonWriter; +import org.apache.hadoop.hive.ql.parse.repl.events.EventHandler; +import org.apache.hadoop.hive.ql.parse.repl.events.EventHandlerFactory; import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc; import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc; @@ -65,9 +68,10 @@ import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; - import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.DataOutputStream; @@ -86,7 +90,12 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import static org.apache.hadoop.hive.ql.parse.HiveParser.*; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_FROM; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_DUMP; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_LOAD; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_STATUS; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TO; public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { // Database name or pattern @@ -130,7 +139,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { }; - public class DumpMetaData { + public static class DumpMetaData { // wrapper class for reading and writing metadata about a dump // responsible for _dumpmetadata files @@ -142,15 +151,18 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { private final Path dumpRoot; private final Path dumpFile; + private final HiveConf hiveConf; private Path cmRoot; - public DumpMetaData(Path dumpRoot) { + public DumpMetaData(Path dumpRoot, HiveConf hiveConf) { this.dumpRoot = dumpRoot; + this.hiveConf = hiveConf; dumpFile = new Path(dumpRoot, DUMPMETADATA); } - public DumpMetaData(Path dumpRoot, DUMPTYPE lvl, Long eventFrom, Long eventTo, Path cmRoot){ - this(dumpRoot); + public DumpMetaData(Path dumpRoot, DUMPTYPE lvl, Long eventFrom, Long eventTo, Path cmRoot, + HiveConf hiveConf){ + this(dumpRoot,hiveConf); setDump(lvl, eventFrom, eventTo, cmRoot); } @@ -165,7 +177,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { public void loadDumpFromFile() throws SemanticException { try { // read from dumpfile and instantiate self - FileSystem fs = dumpFile.getFileSystem(conf); + FileSystem fs = dumpFile.getFileSystem(hiveConf); BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(dumpFile))); String line = null; if ( (line = br.readLine()) != null){ @@ -230,8 +242,16 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } public void write() throws SemanticException { - writeOutput(Arrays.asList(dumpType.toString(), eventFrom.toString(), eventTo.toString(), - cmRoot.toString(), payload), dumpFile); + writeOutput( + Arrays.asList( + dumpType.toString(), + eventFrom.toString(), + eventTo.toString(), + cmRoot.toString(), + payload), + dumpFile, + hiveConf + ); } } @@ -314,7 +334,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { + String.valueOf(eventTo) + " maxEventLimit " + String.valueOf(maxEventLimit)); String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR); Path dumpRoot = new Path(replRoot, getNextDumpDir()); - DumpMetaData dmd = new DumpMetaData(dumpRoot); + DumpMetaData dmd = new DumpMetaData(dumpRoot, conf); Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); Long lastReplId; try { @@ -324,6 +344,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { for (String dbName : matchesDb(dbNameOrPattern)) { LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName); Path dbRoot = dumpDbMetadata(dbName, dumpRoot); + dumpFunctionMetadata(dbName, dumpRoot); for (String tblName : matchesTbl(dbName, tblNameOrPattern)) { LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri()); @@ -403,7 +424,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId); writeOutput( Arrays.asList("incremental", String.valueOf(eventFrom), String.valueOf(lastReplId)), - dmd.getDumpFilePath()); + dmd.getDumpFilePath(), conf); dmd.setDump(DUMPTYPE.INCREMENTAL, eventFrom, lastReplId, cmRoot); dmd.write(); } @@ -417,254 +438,14 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Exception { - long evid = ev.getEventId(); - String evidStr = String.valueOf(evid); - ReplicationSpec replicationSpec = getNewEventOnlyReplicationSpec(evidStr); - MessageDeserializer md = MessageFactory.getInstance().getDeserializer(); - switch (ev.getEventType()){ - case MessageFactory.CREATE_TABLE_EVENT : { - CreateTableMessage ctm = md.getCreateTableMessage(ev.getMessage()); - LOG.info("Processing#{} CREATE_TABLE message : {}", ev.getEventId(), ev.getMessage()); - org.apache.hadoop.hive.metastore.api.Table tobj = ctm.getTableObj(); - - if (tobj == null){ - LOG.debug("Event#{} was a CREATE_TABLE_EVENT with no table listed"); - break; - } - - Table qlMdTable = new Table(tobj); - if (qlMdTable.isView()) { - replicationSpec.setIsMetadataOnly(true); - } - - Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME); - EximUtil.createExportDump( - metaDataPath.getFileSystem(conf), - metaDataPath, - qlMdTable, - null, - replicationSpec); - - Path dataPath = new Path(evRoot, "data"); - Iterable<String> files = ctm.getFiles(); - if (files != null) { - // encoded filename/checksum of files, write into _files - FileSystem fs = dataPath.getFileSystem(conf); - Path filesPath = new Path(dataPath, EximUtil.FILES_NAME); - BufferedWriter fileListWriter = new BufferedWriter( - new OutputStreamWriter(fs.create(filesPath))); - try { - for (String file : files) { - fileListWriter.write(file + "\n"); - } - } finally { - fileListWriter.close(); - } - } - - (new DumpMetaData(evRoot, DUMPTYPE.EVENT_CREATE_TABLE, evid, evid, cmRoot)).write(); - break; - } - case MessageFactory.ADD_PARTITION_EVENT : { - AddPartitionMessage apm = md.getAddPartitionMessage(ev.getMessage()); - LOG.info("Processing#{} ADD_PARTITION message : {}", ev.getEventId(), ev.getMessage()); - Iterable<org.apache.hadoop.hive.metastore.api.Partition> ptns = apm.getPartitionObjs(); - if ((ptns == null) || (!ptns.iterator().hasNext())) { - LOG.debug("Event#{} was an ADD_PTN_EVENT with no partitions"); - break; - } - org.apache.hadoop.hive.metastore.api.Table tobj = apm.getTableObj(); - if (tobj == null){ - LOG.debug("Event#{} was a ADD_PTN_EVENT with no table listed"); - break; - } - - final Table qlMdTable = new Table(tobj); - Iterable<Partition> qlPtns = Iterables.transform( - ptns, - new Function<org.apache.hadoop.hive.metastore.api.Partition, Partition>() { - @Nullable - @Override - public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition input) { - if (input == null) { - return null; - } - try { - return new Partition(qlMdTable, input); - } catch (HiveException e) { - throw new IllegalArgumentException(e); - } - } - } - ); - - Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME); - EximUtil.createExportDump( - metaDataPath.getFileSystem(conf), - metaDataPath, - qlMdTable, - qlPtns, - replicationSpec); - - Iterator<PartitionFiles> partitionFilesIter = apm.getPartitionFilesIter().iterator(); - for (Partition qlPtn : qlPtns){ - PartitionFiles partitionFiles = partitionFilesIter.next(); - Iterable<String> files = partitionFiles.getFiles(); - if (files != null) { - // encoded filename/checksum of files, write into _files - Path ptnDataPath = new Path(evRoot, qlPtn.getName()); - FileSystem fs = ptnDataPath.getFileSystem(conf); - Path filesPath = new Path(ptnDataPath, EximUtil.FILES_NAME); - BufferedWriter fileListWriter = new BufferedWriter( - new OutputStreamWriter(fs.create(filesPath))); - try { - for (String file : files) { - fileListWriter.write(file + "\n"); - } - } finally { - fileListWriter.close(); - } - } - } - - (new DumpMetaData(evRoot, DUMPTYPE.EVENT_ADD_PARTITION, evid, evid, cmRoot)).write(); - break; - } - case MessageFactory.DROP_TABLE_EVENT : { - LOG.info("Processing#{} DROP_TABLE message : {}", ev.getEventId(), ev.getMessage()); - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_DROP_TABLE, evid, evid, cmRoot); - dmd.setPayload(ev.getMessage()); - dmd.write(); - break; - } - case MessageFactory.DROP_PARTITION_EVENT : { - LOG.info("Processing#{} DROP_PARTITION message : {}", ev.getEventId(), ev.getMessage()); - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_DROP_PARTITION, evid, evid, cmRoot); - dmd.setPayload(ev.getMessage()); - dmd.write(); - break; - } - case MessageFactory.ALTER_TABLE_EVENT : { - LOG.info("Processing#{} ALTER_TABLE message : {}", ev.getEventId(), ev.getMessage()); - AlterTableMessage atm = md.getAlterTableMessage(ev.getMessage()); - org.apache.hadoop.hive.metastore.api.Table tobjBefore = atm.getTableObjBefore(); - org.apache.hadoop.hive.metastore.api.Table tobjAfter = atm.getTableObjAfter(); - - if (tobjBefore.getDbName().equals(tobjAfter.getDbName()) && - tobjBefore.getTableName().equals(tobjAfter.getTableName())){ - // regular alter scenario - replicationSpec.setIsMetadataOnly(true); - Table qlMdTableAfter = new Table(tobjAfter); - Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME); - EximUtil.createExportDump( - metaDataPath.getFileSystem(conf), - metaDataPath, - qlMdTableAfter, - null, - replicationSpec); - - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ALTER_TABLE, evid, evid, cmRoot); - dmd.setPayload(ev.getMessage()); - dmd.write(); - } else { - // rename scenario - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_RENAME_TABLE, evid, evid, cmRoot); - dmd.setPayload(ev.getMessage()); - dmd.write(); - } - - break; - } - case MessageFactory.ALTER_PARTITION_EVENT : { - LOG.info("Processing#{} ALTER_PARTITION message : {}", ev.getEventId(), ev.getMessage()); - AlterPartitionMessage apm = md.getAlterPartitionMessage(ev.getMessage()); - org.apache.hadoop.hive.metastore.api.Table tblObj = apm.getTableObj(); - org.apache.hadoop.hive.metastore.api.Partition pobjBefore = apm.getPtnObjBefore(); - org.apache.hadoop.hive.metastore.api.Partition pobjAfter = apm.getPtnObjAfter(); - - boolean renameScenario = false; - Iterator<String> beforeValIter = pobjBefore.getValuesIterator(); - Iterator<String> afterValIter = pobjAfter.getValuesIterator(); - for ( ; beforeValIter.hasNext() ; ){ - if (!beforeValIter.next().equals(afterValIter.next())){ - renameScenario = true; - break; - } - } - - if (!renameScenario){ - // regular partition alter - replicationSpec.setIsMetadataOnly(true); - Table qlMdTable = new Table(tblObj); - List<Partition> qlPtns = new ArrayList<Partition>(); - qlPtns.add(new Partition(qlMdTable, pobjAfter)); - Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME); - EximUtil.createExportDump( - metaDataPath.getFileSystem(conf), - metaDataPath, - qlMdTable, - qlPtns, - replicationSpec); - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ALTER_PARTITION, evid, evid, cmRoot); - dmd.setPayload(ev.getMessage()); - dmd.write(); - break; - } else { - // rename scenario - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_RENAME_PARTITION, evid, evid, cmRoot); - dmd.setPayload(ev.getMessage()); - dmd.write(); - break; - } - } - case MessageFactory.INSERT_EVENT: { - InsertMessage insertMsg = md.getInsertMessage(ev.getMessage()); - String dbName = insertMsg.getDB(); - String tblName = insertMsg.getTable(); - org.apache.hadoop.hive.metastore.api.Table tobj = db.getMSC().getTable(dbName, tblName); - Table qlMdTable = new Table(tobj); - Map<String, String> partSpec = insertMsg.getPartitionKeyValues(); - List<Partition> qlPtns = null; - if (qlMdTable.isPartitioned() && !partSpec.isEmpty()) { - qlPtns = Arrays.asList(db.getPartition(qlMdTable, partSpec, false)); - } - Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME); - replicationSpec.setIsInsert(true); // Mark the replication type as insert into to avoid overwrite while import - EximUtil.createExportDump(metaDataPath.getFileSystem(conf), metaDataPath, qlMdTable, qlPtns, - replicationSpec); - Iterable<String> files = insertMsg.getFiles(); - - if (files != null) { - // encoded filename/checksum of files, write into _files - Path dataPath = new Path(evRoot, EximUtil.DATA_PATH_NAME); - Path filesPath = new Path(dataPath, EximUtil.FILES_NAME); - FileSystem fs = dataPath.getFileSystem(conf); - BufferedWriter fileListWriter = - new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); - - try { - for (String file : files) { - fileListWriter.write(file + "\n"); - } - } finally { - fileListWriter.close(); - } - } - - LOG.info("Processing#{} INSERT message : {}", ev.getEventId(), ev.getMessage()); - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_INSERT, evid, evid, cmRoot); - dmd.setPayload(ev.getMessage()); - dmd.write(); - break; - } - // TODO : handle other event types - default: - LOG.info("Dummy processing#{} message : {}", ev.getEventId(), ev.getMessage()); - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_UNKNOWN, evid, evid, cmRoot); - dmd.setPayload(ev.getMessage()); - dmd.write(); - break; - } + EventHandler.Context context = new EventHandler.Context( + evRoot, + cmRoot, + db, + conf, + getNewEventOnlyReplicationSpec(ev.getEventId()) + ); + EventHandlerFactory.handlerFor(ev).handle(context); } @@ -712,6 +493,37 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { return dbRoot; } + private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; + private static final String FUNCTION_METADATA_DIR_NAME = "_metadata"; + private final static Logger SESSION_STATE_LOG = LoggerFactory.getLogger("SessionState"); + + private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws SemanticException { + Path functionsRoot = new Path(new Path(dumpRoot, dbName), FUNCTIONS_ROOT_DIR_NAME); + try { + // TODO : This should ideally return the Function Objects and not Strings(function names) that should be done by the caller, Look at this separately. + List<String> functionNames = db.getFunctions(dbName, "*"); + for (String functionName : functionNames) { + org.apache.hadoop.hive.metastore.api.Function function = + db.getFunction(dbName, functionName); + if (function.getResourceUris().isEmpty()) { + SESSION_STATE_LOG.warn( + "Not replicating function: " + functionName + " as it seems to have been created " + + "without USING clause"); + continue; + } + + Path functionMetadataRoot = + new Path(new Path(functionsRoot, functionName), FUNCTION_METADATA_DIR_NAME); + try (JsonWriter jsonWriter = new JsonWriter(functionMetadataRoot.getFileSystem(conf), + functionMetadataRoot)) { + new FunctionSerializer(function).writeTo(jsonWriter, getNewReplicationSpec()); + } + } + } catch (Exception e) { + throw new SemanticException(e); + } + } + /** * * @param ast @@ -822,7 +634,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { // At this point, all dump dirs should contain a _dumpmetadata file that // tells us what is inside that dumpdir. - DumpMetaData dmd = new DumpMetaData(loadPath); + DumpMetaData dmd = new DumpMetaData(loadPath, conf); boolean evDump = false; if (dmd.isIncrementalDump()){ @@ -900,7 +712,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { // entire chain String locn = dir.getPath().toUri().toString(); - DumpMetaData eventDmd = new DumpMetaData(new Path(locn)); + DumpMetaData eventDmd = new DumpMetaData(new Path(locn), conf); List<Task<? extends Serializable>> evTasks = analyzeEventLoad( dbNameOrPattern, tblNameOrPattern, locn, taskChainTail, dbsUpdated, tablesUpdated, eventDmd); @@ -1020,7 +832,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { DropTableDesc dropTableDesc = new DropTableDesc( actualDbName + "." + actualTblName, null, true, true, - getNewEventOnlyReplicationSpec(String.valueOf(dmd.getEventFrom()))); + getNewEventOnlyReplicationSpec(dmd.getEventFrom())); Task<DDLWork> dropTableTask = TaskFactory.get(new DDLWork(inputs, outputs, dropTableDesc), conf); if (precursor != null){ precursor.addDependentTask(dropTableTask); @@ -1044,7 +856,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { DropTableDesc dropPtnDesc = new DropTableDesc( actualDbName + "." + actualTblName, partSpecs, null, true, - getNewEventOnlyReplicationSpec(String.valueOf(dmd.getEventFrom()))); + getNewEventOnlyReplicationSpec(dmd.getEventFrom())); Task<DDLWork> dropPtnTask = TaskFactory.get(new DDLWork(inputs, outputs, dropPtnDesc), conf); if (precursor != null) { @@ -1360,7 +1172,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { prepareReturnValues(Collections.singletonList(replLastId), "last_repl_id#string"); setFetchTask(createFetchTask("last_repl_id#string")); LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: writing repl.last.id={} out to {}", - String.valueOf(replLastId), ctx.getResFile()); + String.valueOf(replLastId), ctx.getResFile(), conf); } private void prepareReturnValues(List<String> values, String schema) throws SemanticException { @@ -1369,14 +1181,15 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { LOG.debug(" > " + s); } ctx.setResFile(ctx.getLocalTmpPath()); - writeOutput(values, ctx.getResFile()); + writeOutput(values, ctx.getResFile(), conf); } - private void writeOutput(List<String> values, Path outputFile) throws SemanticException { + private static void writeOutput(List<String> values, Path outputFile, HiveConf hiveConf) + throws SemanticException { FileSystem fs = null; DataOutputStream outStream = null; try { - fs = outputFile.getFileSystem(conf); + fs = outputFile.getFileSystem(hiveConf); outStream = fs.create(outputFile); outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0))); for (int i = 1; i < values.size(); i++) { @@ -1407,9 +1220,9 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { return new ReplicationSpec(true, false, evState, objState, false, true, false); } - // Use for replication states focussed on event only, where the obj state will be the event state - private ReplicationSpec getNewEventOnlyReplicationSpec(String evState) throws SemanticException { - return getNewReplicationSpec(evState, evState); + // Use for replication states focused on event only, where the obj state will be the event state + private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) throws SemanticException { + return getNewReplicationSpec(eventId.toString(), eventId.toString()); } private Iterable<? extends String> matchesTbl(String dbName, String tblPattern) http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/DBSerializer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/DBSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/DBSerializer.java new file mode 100644 index 0000000..40770de --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/DBSerializer.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump; + +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TJSONProtocol; + +import java.io.IOException; + +public class DBSerializer implements JsonWriter.Serializer { + private final Database dbObject; + + public DBSerializer(Database dbObject) { + this.dbObject = dbObject; + } + + @Override + public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) + throws SemanticException, IOException { + dbObject.putToParameters( + ReplicationSpec.KEY.CURR_STATE_ID.toString(), + additionalPropertiesProvider.getCurrentReplicationState() + ); + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + try { + String value = serializer.toString(dbObject, "UTF-8"); + writer.jsonGenerator.writeStringField("db", value); + } catch (TException e) { + throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e); + } + } +} + + http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/FunctionSerializer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/FunctionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/FunctionSerializer.java new file mode 100644 index 0000000..6b03766 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/FunctionSerializer.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump; + +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TJSONProtocol; + +import java.io.IOException; + +public class FunctionSerializer implements JsonWriter.Serializer { + private Function function; + + public FunctionSerializer(Function function) { + this.function = function; + } + + @Override + public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) + throws SemanticException, IOException { + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + try { + writer.jsonGenerator + .writeStringField("function", serializer.toString(function, "UTF-8")); + } catch (TException e) { + throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/JsonWriter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/JsonWriter.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/JsonWriter.java new file mode 100644 index 0000000..1aa1195 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/JsonWriter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonGenerator; + +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; + +import static org.apache.hadoop.hive.ql.parse.EximUtil.METADATA_FORMAT_VERSION; + +public class JsonWriter implements Closeable { + + final JsonGenerator jsonGenerator; + + public JsonWriter(FileSystem fs, Path writePath) throws IOException { + OutputStream out = fs.create(writePath); + jsonGenerator = new JsonFactory().createJsonGenerator(out); + jsonGenerator.writeStartObject(); + jsonGenerator.writeStringField("version", METADATA_FORMAT_VERSION); + } + + @Override + public void close() throws IOException { + jsonGenerator.writeEndObject(); + jsonGenerator.close(); + } + + public interface Serializer { + void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) throws + SemanticException, IOException; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionSerializer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionSerializer.java new file mode 100644 index 0000000..313d108 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionSerializer.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump; + +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TJSONProtocol; + +import java.io.IOException; +import java.util.Map; + +class PartitionSerializer implements JsonWriter.Serializer { + private Partition partition; + + PartitionSerializer(Partition partition) { + this.partition = partition; + } + + @Override + public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) + throws SemanticException, IOException { + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + try { + if (additionalPropertiesProvider.isInReplicationScope()) { + partition.putToParameters( + ReplicationSpec.KEY.CURR_STATE_ID.toString(), + additionalPropertiesProvider.getCurrentReplicationState()); + if (isPartitionExternal()) { + // Replication destination will not be external + partition.putToParameters("EXTERNAL", "FALSE"); + } + } + writer.jsonGenerator.writeString(serializer.toString(partition, "UTF-8")); + writer.jsonGenerator.flush(); + } catch (TException e) { + throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e); + } + } + + private boolean isPartitionExternal() { + Map<String, String> params = partition.getParameters(); + return params.containsKey("EXTERNAL") + && params.get("EXTERNAL").equalsIgnoreCase("TRUE"); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/ReplicationSpecSerializer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/ReplicationSpecSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/ReplicationSpecSerializer.java new file mode 100644 index 0000000..d88a553 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/ReplicationSpecSerializer.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump; + +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.io.IOException; + +public class ReplicationSpecSerializer implements JsonWriter.Serializer { + @Override + public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) + throws SemanticException, IOException { + for (ReplicationSpec.KEY key : ReplicationSpec.KEY.values()) { + String value = additionalPropertiesProvider.get(key); + if (value != null) { + writer.jsonGenerator.writeStringField(key.toString(), value); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableSerializer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableSerializer.java new file mode 100644 index 0000000..a2e258f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableSerializer.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump; + +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TJSONProtocol; + +import java.io.IOException; +import java.util.Map; + +public class TableSerializer implements JsonWriter.Serializer { + private final org.apache.hadoop.hive.ql.metadata.Table tableHandle; + private final Iterable<Partition> partitions; + + public TableSerializer(org.apache.hadoop.hive.ql.metadata.Table tableHandle, + Iterable<Partition> partitions) { + this.tableHandle = tableHandle; + this.partitions = partitions; + } + + @Override + public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) + throws SemanticException, IOException { + if (cannotReplicateTable(additionalPropertiesProvider)) { + return; + } + + Table tTable = tableHandle.getTTable(); + tTable = addPropertiesToTable(tTable, additionalPropertiesProvider); + try { + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + writer.jsonGenerator + .writeStringField("table", serializer.toString(tTable, "UTF-8")); + writer.jsonGenerator.writeFieldName("partitions"); + writePartitions(writer, additionalPropertiesProvider); + } catch (TException e) { + throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e); + } + } + + private boolean cannotReplicateTable(ReplicationSpec additionalPropertiesProvider) { + return tableHandle == null || additionalPropertiesProvider.isNoop(); + } + + private Table addPropertiesToTable(Table table, ReplicationSpec additionalPropertiesProvider) + throws SemanticException, IOException { + if (additionalPropertiesProvider.isInReplicationScope()) { + table.putToParameters( + ReplicationSpec.KEY.CURR_STATE_ID.toString(), + additionalPropertiesProvider.getCurrentReplicationState()); + if (isExternalTable(table)) { + // Replication destination will not be external - override if set + table.putToParameters("EXTERNAL", "FALSE"); + } + if (isExternalTableType(table)) { + // Replication dest will not be external - override if set + table.setTableType(TableType.MANAGED_TABLE.toString()); + } + } else { + // ReplicationSpec.KEY scopeKey = ReplicationSpec.KEY.REPL_SCOPE; + // write(out, ",\""+ scopeKey.toString() +"\":\"" + replicationSpec.get(scopeKey) + "\""); + // TODO: if we want to be explicit about this dump not being a replication dump, we can + // uncomment this else section, but currently unnneeded. Will require a lot of golden file + // regen if we do so. + } + return table; + } + + private boolean isExternalTableType(org.apache.hadoop.hive.metastore.api.Table table) { + return table.isSetTableType() + && table.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString()); + } + + private boolean isExternalTable(org.apache.hadoop.hive.metastore.api.Table table) { + Map<String, String> params = table.getParameters(); + return params.containsKey("EXTERNAL") + && params.get("EXTERNAL").equalsIgnoreCase("TRUE"); + } + + private void writePartitions(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) + throws SemanticException, IOException { + writer.jsonGenerator.writeStartArray(); + if (partitions != null) { + for (org.apache.hadoop.hive.ql.metadata.Partition partition : partitions) { + new PartitionSerializer(partition.getTPartition()) + .writeTo(writer, additionalPropertiesProvider); + } + } + writer.jsonGenerator.writeEndArray(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/VersionCompatibleSerializer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/VersionCompatibleSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/VersionCompatibleSerializer.java new file mode 100644 index 0000000..3ebc803 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/VersionCompatibleSerializer.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump; + +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.io.IOException; + +import static org.apache.hadoop.hive.ql.parse.EximUtil.METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION; + +/** + * This is not used as of now as the conditional which lead to its usage is always false + * hence we have removed the conditional and the usage of this class, but might be required in future. + */ +public class VersionCompatibleSerializer implements JsonWriter.Serializer { + @Override + public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) + throws SemanticException, IOException { + writer.jsonGenerator.writeStringField("fcversion", METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java new file mode 100644 index 0000000..ab059c2 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractHandler implements EventHandler { + static final Logger LOG = LoggerFactory.getLogger(AbstractHandler.class); + + final NotificationEvent event; + final MessageDeserializer deserializer; + + AbstractHandler(NotificationEvent event) { + this.event = event; + deserializer = MessageFactory.getInstance().getDeserializer(); + } + + @Override + public long fromEventId() { + return event.getEventId(); + } + + @Override + public long toEventId() { + return event.getEventId(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java new file mode 100644 index 0000000..9a4f8b9 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.events; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.PartitionFiles; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; + +import javax.annotation.Nullable; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.Iterator; + +import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; + +public class AddPartitionHandler extends AbstractHandler { + protected AddPartitionHandler(NotificationEvent notificationEvent) { + super(notificationEvent); + } + + @Override + public void handle(Context withinContext) throws Exception { + AddPartitionMessage apm = deserializer.getAddPartitionMessage(event.getMessage()); + LOG.info("Processing#{} ADD_PARTITION message : {}", fromEventId(), event.getMessage()); + Iterable<org.apache.hadoop.hive.metastore.api.Partition> ptns = apm.getPartitionObjs(); + if ((ptns == null) || (!ptns.iterator().hasNext())) { + LOG.debug("Event#{} was an ADD_PTN_EVENT with no partitions"); + return; + } + org.apache.hadoop.hive.metastore.api.Table tobj = apm.getTableObj(); + if (tobj == null) { + LOG.debug("Event#{} was a ADD_PTN_EVENT with no table listed"); + return; + } + + final Table qlMdTable = new Table(tobj); + Iterable<Partition> qlPtns = Iterables.transform( + ptns, + new Function<org.apache.hadoop.hive.metastore.api.Partition, Partition>() { + @Nullable + @Override + public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition input) { + if (input == null) { + return null; + } + try { + return new Partition(qlMdTable, input); + } catch (HiveException e) { + throw new IllegalArgumentException(e); + } + } + } + ); + + Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); + EximUtil.createExportDump( + metaDataPath.getFileSystem(withinContext.hiveConf), + metaDataPath, + qlMdTable, + qlPtns, + withinContext.replicationSpec); + + Iterator<PartitionFiles> partitionFilesIter = apm.getPartitionFilesIter().iterator(); + for (Partition qlPtn : qlPtns) { + Iterable<String> files = partitionFilesIter.next().getFiles(); + if (files != null) { + // encoded filename/checksum of files, write into _files + try (BufferedWriter fileListWriter = writer(withinContext, qlPtn)) { + for (String file : files) { + fileListWriter.write(file + "\n"); + } + } + } + } + withinContext.createDmd(this).write(); + } + + private BufferedWriter writer(Context withinContext, Partition qlPtn) + throws IOException { + Path ptnDataPath = new Path(withinContext.eventRoot, qlPtn.getName()); + FileSystem fs = ptnDataPath.getFileSystem(withinContext.hiveConf); + Path filesPath = new Path(ptnDataPath, EximUtil.FILES_NAME); + return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); + } + + @Override + public DUMPTYPE dumpType() { + return DUMPTYPE.EVENT_ADD_PARTITION; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java new file mode 100644 index 0000000..1073cd0 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.events; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; +import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData; + +public class AlterPartitionHandler extends AbstractHandler { + private final org.apache.hadoop.hive.metastore.api.Partition after; + private final org.apache.hadoop.hive.metastore.api.Table tableObject; + private final Scenario scenario; + + AlterPartitionHandler(NotificationEvent event) throws Exception { + super(event); + AlterPartitionMessage apm = deserializer.getAlterPartitionMessage(event.getMessage()); + tableObject = apm.getTableObj(); + org.apache.hadoop.hive.metastore.api.Partition before = apm.getPtnObjBefore(); + after = apm.getPtnObjAfter(); + scenario = scenarioType(before, after); + } + + private enum Scenario { + ALTER { + @Override + DUMPTYPE dumpType() { + return DUMPTYPE.EVENT_ALTER_PARTITION; + } + }, + RENAME { + @Override + DUMPTYPE dumpType() { + return DUMPTYPE.EVENT_RENAME_PARTITION; + } + }; + + abstract DUMPTYPE dumpType(); + } + + private static Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Partition before, + org.apache.hadoop.hive.metastore.api.Partition after) { + Iterator<String> beforeValIter = before.getValuesIterator(); + Iterator<String> afterValIter = after.getValuesIterator(); + while(beforeValIter.hasNext()) { + if (!beforeValIter.next().equals(afterValIter.next())) { + return Scenario.RENAME; + } + } + return Scenario.ALTER; + } + + @Override + public void handle(Context withinContext) throws Exception { + LOG.info("Processing#{} ALTER_PARTITION message : {}", fromEventId(), event.getMessage()); + + if (Scenario.ALTER == scenario) { + withinContext.replicationSpec.setIsMetadataOnly(true); + Table qlMdTable = new Table(tableObject); + List<Partition> qlPtns = new ArrayList<>(); + qlPtns.add(new Partition(qlMdTable, after)); + Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); + EximUtil.createExportDump( + metaDataPath.getFileSystem(withinContext.hiveConf), + metaDataPath, + qlMdTable, + qlPtns, + withinContext.replicationSpec); + } + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); + } + + @Override + public DUMPTYPE dumpType() { + return scenario.dumpType(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java new file mode 100644 index 0000000..04d9d79 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.events; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; + +import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; +import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData; + +public class AlterTableHandler extends AbstractHandler { + private final org.apache.hadoop.hive.metastore.api.Table before; + private final org.apache.hadoop.hive.metastore.api.Table after; + private final Scenario scenario; + + private enum Scenario { + ALTER { + @Override + DUMPTYPE dumpType() { + return DUMPTYPE.EVENT_ALTER_TABLE; + } + }, + RENAME { + @Override + DUMPTYPE dumpType() { + return DUMPTYPE.EVENT_RENAME_TABLE; + } + }; + + abstract DUMPTYPE dumpType(); + } + + AlterTableHandler(NotificationEvent event) throws Exception { + super(event); + AlterTableMessage atm = deserializer.getAlterTableMessage(event.getMessage()); + before = atm.getTableObjBefore(); + after = atm.getTableObjAfter(); + scenario = scenarioType(before, after); + } + + private static Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Table before, + org.apache.hadoop.hive.metastore.api.Table after) { + return before.getDbName().equals(after.getDbName()) + && before.getTableName().equals(after.getTableName()) + ? Scenario.ALTER + : Scenario.RENAME; + } + + @Override + public void handle(Context withinContext) throws Exception { + { + LOG.info("Processing#{} ALTER_TABLE message : {}", fromEventId(), event.getMessage()); + if (Scenario.ALTER == scenario) { + withinContext.replicationSpec.setIsMetadataOnly(true); + Table qlMdTableAfter = new Table(after); + Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); + EximUtil.createExportDump( + metaDataPath.getFileSystem(withinContext.hiveConf), + metaDataPath, + qlMdTableAfter, + null, + withinContext.replicationSpec); + } + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); + } + } + + @Override + public DUMPTYPE dumpType() { + return scenario.dumpType(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java new file mode 100644 index 0000000..03f400d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.events; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; + +import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; + +public class CreateTableHandler extends AbstractHandler { + + CreateTableHandler(NotificationEvent event) { + super(event); + } + + @Override + public void handle(Context withinContext) throws Exception { + CreateTableMessage ctm = deserializer.getCreateTableMessage(event.getMessage()); + LOG.info("Processing#{} CREATE_TABLE message : {}", fromEventId(), event.getMessage()); + org.apache.hadoop.hive.metastore.api.Table tobj = ctm.getTableObj(); + + if (tobj == null) { + LOG.debug("Event#{} was a CREATE_TABLE_EVENT with no table listed"); + return; + } + + Table qlMdTable = new Table(tobj); + if (qlMdTable.isView()) { + withinContext.replicationSpec.setIsMetadataOnly(true); + } + + Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); + EximUtil.createExportDump( + metaDataPath.getFileSystem(withinContext.hiveConf), + metaDataPath, + qlMdTable, + null, + withinContext.replicationSpec); + + Path dataPath = new Path(withinContext.eventRoot, "data"); + Iterable<String> files = ctm.getFiles(); + if (files != null) { + // encoded filename/checksum of files, write into _files + try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { + for (String file : files) { + fileListWriter.write(file + "\n"); + } + } + } + withinContext.createDmd(this).write(); + } + + private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException { + FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf); + Path filesPath = new Path(dataPath, EximUtil.FILES_NAME); + return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); + } + + @Override + public DUMPTYPE dumpType() { + return DUMPTYPE.EVENT_CREATE_TABLE; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java new file mode 100644 index 0000000..61c5f37 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; +import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData; + +public class DefaultHandler extends AbstractHandler { + + DefaultHandler(NotificationEvent event) { + super(event); + } + + @Override + public void handle(Context withinContext) throws Exception { + LOG.info("Dummy processing#{} message : {}", fromEventId(), event.getMessage()); + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); + } + + @Override + public DUMPTYPE dumpType() { + return DUMPTYPE.EVENT_UNKNOWN; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java new file mode 100644 index 0000000..3ad794e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; +import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData; + +public class DropPartitionHandler extends AbstractHandler { + + DropPartitionHandler(NotificationEvent event) { + super(event); + } + + @Override + public void handle(Context withinContext) throws Exception { + LOG.info("Processing#{} DROP_PARTITION message : {}", fromEventId(), event.getMessage()); + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); + } + + @Override + public DUMPTYPE dumpType() { + return DUMPTYPE.EVENT_DROP_PARTITION; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java new file mode 100644 index 0000000..cae379b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; +import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData; + +public class DropTableHandler extends AbstractHandler { + + DropTableHandler(NotificationEvent event) { + super(event); + } + + @Override + public void handle(Context withinContext) throws Exception { + LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), event.getMessage()); + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); + } + + @Override + public DUMPTYPE dumpType() { + return DUMPTYPE.EVENT_DROP_TABLE; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2985262b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java new file mode 100644 index 0000000..199145a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.events; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; + +import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DumpMetaData; +import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.DUMPTYPE; + +public interface EventHandler { + void handle(Context withinContext) throws Exception; + + long fromEventId(); + + long toEventId(); + + DUMPTYPE dumpType(); + + class Context { + final Path eventRoot, cmRoot; + final Hive db; + final HiveConf hiveConf; + final ReplicationSpec replicationSpec; + + public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf, + ReplicationSpec replicationSpec) { + this.eventRoot = eventRoot; + this.cmRoot = cmRoot; + this.db = db; + this.hiveConf = hiveConf; + this.replicationSpec = replicationSpec; + } + + DumpMetaData createDmd(EventHandler eventHandler) { + return new DumpMetaData( + eventRoot, + eventHandler.dumpType(), + eventHandler.fromEventId(), + eventHandler.toEventId(), + cmRoot, hiveConf + ); + } + } +}