Repository: hive Updated Branches: refs/heads/master 5df15407f -> 8deb77940
HIVE-17366: Constraint replication in bootstrap (Daniel Dai, reviewed by Sankar Hariappan, Thejas Nair) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8deb7794 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8deb7794 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8deb7794 Branch: refs/heads/master Commit: 8deb7794093169049fa80d224fc82a2561567717 Parents: 5df1540 Author: Daniel Dai <da...@hortonworks.com> Authored: Sun Sep 10 16:17:08 2017 -0700 Committer: Daniel Dai <da...@hortonworks.com> Committed: Sun Sep 10 16:17:08 2017 -0700 ---------------------------------------------------------------------- .../hive/ql/parse/TestReplicationScenarios.java | 9 +- .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 22 +++- .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 32 +++++ .../ql/exec/repl/bootstrap/ReplLoadTask.java | 32 ++++- .../ql/exec/repl/bootstrap/ReplLoadWork.java | 9 +- .../repl/bootstrap/events/BootstrapEvent.java | 2 +- .../repl/bootstrap/events/ConstraintEvent.java | 24 ++++ .../filesystem/ConstraintEventsIterator.java | 90 ++++++++++++++ .../events/filesystem/FSConstraintEvent.java | 39 ++++++ .../repl/bootstrap/load/LoadConstraint.java | 119 +++++++++++++++++++ .../apache/hadoop/hive/ql/metadata/Hive.java | 42 +++++++ .../apache/hadoop/hive/ql/parse/EximUtil.java | 2 +- .../ql/parse/ReplicationSemanticAnalyzer.java | 1 + .../repl/dump/io/ConstraintsSerializer.java | 71 +++++++++++ .../repl/load/message/AddForeignKeyHandler.java | 16 ++- .../message/AddNotNullConstraintHandler.java | 7 +- .../repl/load/message/AddPrimaryKeyHandler.java | 7 +- .../message/AddUniqueConstraintHandler.java | 6 +- 18 files changed, 503 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/8deb7794/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 6a2e400..5eae1fe 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -2776,16 +2776,15 @@ public class TestReplicationScenarios { LOG.info("Dumped to {} with id {}", replDumpLocn, replDumpId); run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'", driverMirror); - // bootstrap replication for constraint is not implemented. Will verify it works once done try { List<SQLPrimaryKey> pks = metaStoreClientMirror.getPrimaryKeys(new PrimaryKeysRequest(dbName+ "_dupe" , "tbl1")); - assertTrue(pks.isEmpty()); + assertEquals(pks.size(), 1); List<SQLUniqueConstraint> uks = metaStoreClientMirror.getUniqueConstraints(new UniqueConstraintsRequest(dbName+ "_dupe" , "tbl1")); - assertTrue(uks.isEmpty()); + assertEquals(uks.size(), 1); List<SQLForeignKey> fks = metaStoreClientMirror.getForeignKeys(new ForeignKeysRequest(null, null, dbName+ "_dupe" , "tbl2")); - assertTrue(fks.isEmpty()); + assertEquals(fks.size(), 1); List<SQLNotNullConstraint> nns = metaStoreClientMirror.getNotNullConstraints(new NotNullConstraintsRequest(dbName+ "_dupe" , "tbl3")); - assertTrue(nns.isEmpty()); + assertEquals(nns.size(), 1); } catch (TException te) { assertNull(te); } http://git-wip-us.apache.org/repos/asf/hive/blob/8deb7794/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 646bb23..bb73d28 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -79,6 +79,7 @@ import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; @@ -4088,14 +4089,23 @@ public class DDLTask extends Task<DDLWork> implements Serializable { throws SemanticException, HiveException { try { // This is either an alter table add foreign key or add primary key command. - if (alterTbl.getForeignKeyCols() != null - && !alterTbl.getForeignKeyCols().isEmpty()) { - db.addForeignKey(alterTbl.getForeignKeyCols()); - } - if (alterTbl.getPrimaryKeyCols() != null - && !alterTbl.getPrimaryKeyCols().isEmpty()) { + if (alterTbl.getPrimaryKeyCols() != null && !alterTbl.getPrimaryKeyCols().isEmpty()) { db.addPrimaryKey(alterTbl.getPrimaryKeyCols()); } + if (alterTbl.getForeignKeyCols() != null && !alterTbl.getForeignKeyCols().isEmpty()) { + try { + db.addForeignKey(alterTbl.getForeignKeyCols()); + } catch (HiveException e) { + if (e.getCause() instanceof InvalidObjectException + && alterTbl.getReplicationSpec()!= null && alterTbl.getReplicationSpec().isInReplicationScope()) { + // During repl load, NoSuchObjectException in foreign key shall + // ignore as the foreign table may not be part of the replication + LOG.debug(e.getMessage()); + } else { + throw e; + } + } + } if (alterTbl.getUniqueConstraintCols() != null && !alterTbl.getUniqueConstraintCols().isEmpty()) { db.addUniqueConstraint(alterTbl.getUniqueConstraintCols()); http://git-wip-us.apache.org/repos/asf/hive/blob/8deb7794/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 95eb2db..3cae543 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -25,7 +25,12 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; +import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; import org.apache.hadoop.hive.metastore.messaging.EventUtils; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; @@ -49,6 +54,7 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler; import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandlerFactory; +import org.apache.hadoop.hive.ql.parse.repl.dump.io.ConstraintsSerializer; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer; import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; import org.apache.hadoop.hive.ql.parse.repl.dump.log.BootstrapDumpLogger; @@ -66,6 +72,7 @@ import java.util.UUID; public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; + private static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints"; private static final String FUNCTION_METADATA_FILE_NAME = "_metadata"; private Logger LOG = LoggerFactory.getLogger(ReplDumpTask.class); @@ -194,6 +201,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { LOG.debug( "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri()); dumpTable(dbName, tblName, dbRoot); + dumpConstraintMetadata(dbName, tblName, dbRoot); } Utils.resetDbBootstrapDumpState(hiveDb, dbName, uniqueKey); replLogger.endLog(bootDumpBeginReplId.toString()); @@ -302,6 +310,30 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { } } + private void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot) throws Exception { + try { + Path constraintsRoot = new Path(dbRoot, CONSTRAINTS_ROOT_DIR_NAME); + Path constraintsFile = new Path(constraintsRoot, tblName); + Hive db = getHive(); + List<SQLPrimaryKey> pks = db.getPrimaryKeyList(dbName, tblName); + List<SQLForeignKey> fks = db.getForeignKeyList(dbName, tblName); + List<SQLUniqueConstraint> uks = db.getUniqueConstraintList(dbName, tblName); + List<SQLNotNullConstraint> nns = db.getNotNullConstraintList(dbName, tblName); + if ((pks != null && !pks.isEmpty()) || (fks != null && !fks.isEmpty()) || (uks != null && !uks.isEmpty()) + || (nns != null && !nns.isEmpty())) { + try (JsonWriter jsonWriter = + new JsonWriter(constraintsFile.getFileSystem(conf), constraintsFile)) { + ConstraintsSerializer serializer = new ConstraintsSerializer(pks, fks, uks, nns, conf); + serializer.writeTo(jsonWriter, null); + } + } + } catch (NoSuchObjectException e) { + // Bootstrap constraint dump shouldn't fail if the table is dropped/renamed while dumping it. + // Just log a debug message and skip it. + LOG.debug(e.getMessage()); + } + } + private HiveWrapper.Tuple<Function> functionTuple(String functionName, String dbName) { try { HiveWrapper.Tuple<Function> tuple = new HiveWrapper(getHive(), dbName).function(functionName); http://git-wip-us.apache.org/repos/asf/hive/blob/8deb7794/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java index cd31b17..706d0b6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java @@ -25,11 +25,14 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.PartitionEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadConstraint; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadFunction; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; @@ -77,6 +80,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { a database ( directory ) */ BootstrapEventsIterator iterator = work.iterator(); + ConstraintEventsIterator constraintIterator = work.constraintIterator(); /* This is used to get hold of a reference during the current creation of tasks and is initialized with "0" tasks such that it will be non consequential in any operations done with task tracker @@ -85,8 +89,17 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { TaskTracker dbTracker = new TaskTracker(ZERO_TASKS); TaskTracker tableTracker = new TaskTracker(ZERO_TASKS); Scope scope = new Scope(); - while (iterator.hasNext() && loadTaskTracker.canAddMoreTasks()) { - BootstrapEvent next = iterator.next(); + boolean loadingConstraint = false; + if (!iterator.hasNext() && constraintIterator.hasNext()) { + loadingConstraint = true; + } + while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext())) && loadTaskTracker.canAddMoreTasks()) { + BootstrapEvent next; + if (!loadingConstraint) { + next = iterator.next(); + } else { + next = constraintIterator.next(); + } switch (next.eventType()) { case Database: DatabaseEvent dbEvent = (DatabaseEvent) next; @@ -173,15 +186,24 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { functionsTracker.debugLog("functions"); break; } + case Constraint: { + LoadConstraint loadConstraint = + new LoadConstraint(context, (ConstraintEvent) next, work.dbNameToLoadIn, dbTracker); + TaskTracker constraintTracker = loadConstraint.tasks(); + scope.rootTasks.addAll(constraintTracker.tasks()); + loadTaskTracker.update(constraintTracker); + constraintTracker.debugLog("constraints"); + } } - if (!iterator.currentDbHasNext()) { + if (!loadingConstraint && !iterator.currentDbHasNext()) { createEndReplLogTask(context, scope, iterator.replLogger()); } } - boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState(); + boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState() + || constraintIterator.hasNext(); createBuilderTask(scope.rootTasks, addAnotherLoadTask); - if (!iterator.hasNext()) { + if (!iterator.hasNext() && !constraintIterator.hasNext()) { loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope)); work.updateDbEventState(null); } http://git-wip-us.apache.org/repos/asf/hive/blob/8deb7794/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java index f51afe1..a8e9067 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator; import org.apache.hadoop.hive.ql.plan.Explain; import java.io.IOException; @@ -32,6 +33,7 @@ public class ReplLoadWork implements Serializable { final String dbNameToLoadIn; final String tableNameToLoadIn; private final BootstrapEventsIterator iterator; + private final ConstraintEventsIterator constraintsIterator; private int loadTaskRunCount = 0; private DatabaseEvent.State state = null; @@ -39,6 +41,7 @@ public class ReplLoadWork implements Serializable { String tableNameToLoadIn) throws IOException { this.tableNameToLoadIn = tableNameToLoadIn; this.iterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf); + this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); this.dbNameToLoadIn = dbNameToLoadIn; } @@ -51,6 +54,10 @@ public class ReplLoadWork implements Serializable { return iterator; } + public ConstraintEventsIterator constraintIterator() { + return constraintsIterator; + } + int executedLoadTask() { return ++loadTaskRunCount; } @@ -67,4 +74,4 @@ public class ReplLoadWork implements Serializable { boolean hasDbState() { return state != null; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hive/blob/8deb7794/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java index db2b0ac..7b7aac9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java @@ -22,7 +22,7 @@ public interface BootstrapEvent { EventType eventType(); enum EventType { - Database, Table, Function, Partition + Database, Table, Function, Partition, Constraint } } http://git-wip-us.apache.org/repos/asf/hive/blob/8deb7794/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/ConstraintEvent.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/ConstraintEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/ConstraintEvent.java new file mode 100644 index 0000000..7429283 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/ConstraintEvent.java @@ -0,0 +1,24 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events; + +import org.apache.hadoop.fs.Path; + +public interface ConstraintEvent extends BootstrapEvent { + Path rootDir(); +} http://git-wip-us.apache.org/repos/asf/hive/blob/8deb7794/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java new file mode 100644 index 0000000..12d4c0d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java @@ -0,0 +1,90 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; + +public class ConstraintEventsIterator implements Iterator<FSConstraintEvent> { + private FileStatus[] dbDirs; + private int currentDbIndex; + private FileStatus[] constraintFiles = null; + private int currentConstraintIndex; + private FileSystem fs; + private Path path; + + public ConstraintEventsIterator(String dumpDirectory, HiveConf hiveConf) throws IOException { + path = new Path(dumpDirectory); + fs = path.getFileSystem(hiveConf); + } + + private FileStatus[] listConstraintFilesInDBDir(FileSystem fs, Path dbDir) { + try { + return fs.listStatus(new Path(dbDir, ReplicationSemanticAnalyzer.CONSTRAINTS_ROOT_DIR_NAME)); + } catch (FileNotFoundException e) { + return new FileStatus[]{}; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean hasNext() { + if (dbDirs == null) { + try { + dbDirs = fs.listStatus(path, EximUtil.getDirectoryFilter(fs)); + } catch (IOException e) { + throw new RuntimeException(e); + } + currentDbIndex = 0; + if (dbDirs.length != 0) { + currentConstraintIndex = 0; + constraintFiles = listConstraintFilesInDBDir(fs, dbDirs[0].getPath()); + } + } + if ((currentDbIndex < dbDirs.length) && (currentConstraintIndex < constraintFiles.length)) { + return true; + } + while ((currentDbIndex < dbDirs.length) && (currentConstraintIndex == constraintFiles.length)) { + currentDbIndex ++; + if (currentDbIndex < dbDirs.length) { + currentConstraintIndex = 0; + constraintFiles = listConstraintFilesInDBDir(fs, dbDirs[currentDbIndex].getPath()); + } else { + constraintFiles = null; + } + } + return constraintFiles != null; + } + + @Override + public FSConstraintEvent next() { + int thisIndex = currentConstraintIndex; + currentConstraintIndex++; + return new FSConstraintEvent(constraintFiles[thisIndex].getPath()); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/8deb7794/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSConstraintEvent.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSConstraintEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSConstraintEvent.java new file mode 100644 index 0000000..a2ad444 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSConstraintEvent.java @@ -0,0 +1,39 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent; + +public class FSConstraintEvent implements ConstraintEvent { + private final Path rootDir; + + FSConstraintEvent(Path rootDir) { + this.rootDir = rootDir; + } + + @Override + public Path rootDir() { + return rootDir; + } + + @Override + public EventType eventType() { + return EventType.Constraint; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/8deb7794/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java new file mode 100644 index 0000000..fc2aa8d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java @@ -0,0 +1,119 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import org.apache.hadoop.hive.ql.parse.repl.load.message.AddForeignKeyHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.AddNotNullConstraintHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.AddPrimaryKeyHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.AddUniqueConstraintHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.stripQuotes; + +public class LoadConstraint { + private static final Logger LOG = LoggerFactory.getLogger(LoadFunction.class); + private Context context; + private final ConstraintEvent event; + private final String dbNameToLoadIn; + private final TaskTracker tracker; + + public LoadConstraint(Context context, ConstraintEvent event, String dbNameToLoadIn, + TaskTracker existingTracker) { + this.context = context; + this.event = event; + this.dbNameToLoadIn = dbNameToLoadIn; + this.tracker = new TaskTracker(existingTracker); + } + + public TaskTracker tasks() throws IOException, SemanticException { + URI fromURI = EximUtil + .getValidatedURI(context.hiveConf, stripQuotes(event.rootDir().toUri().toString())); + Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath()); + + try { + FileSystem fs = FileSystem.get(fromPath.toUri(), context.hiveConf); + JSONObject json = new JSONObject(EximUtil.readAsString(fs, fromPath)); + String pksString = json.getString("pks"); + String fksString = json.getString("fks"); + String uksString = json.getString("uks"); + String nnsString = json.getString("nns"); + List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>(); + + AddPrimaryKeyHandler pkHandler = new AddPrimaryKeyHandler(); + DumpMetaData pkDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_PRIMARYKEY, Long.MAX_VALUE, Long.MAX_VALUE, null, + context.hiveConf); + pkDumpMetaData.setPayload(pksString); + tasks.addAll(pkHandler.handle( + new MessageHandler.Context( + dbNameToLoadIn, null, fromPath.toString(), null, pkDumpMetaData, context.hiveConf, + context.hiveDb, null, LOG))); + + AddForeignKeyHandler fkHandler = new AddForeignKeyHandler(); + DumpMetaData fkDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_FOREIGNKEY, Long.MAX_VALUE, Long.MAX_VALUE, null, + context.hiveConf); + fkDumpMetaData.setPayload(fksString); + tasks.addAll(fkHandler.handle( + new MessageHandler.Context( + dbNameToLoadIn, null, fromPath.toString(), null, fkDumpMetaData, context.hiveConf, + context.hiveDb, null, LOG))); + + AddUniqueConstraintHandler ukHandler = new AddUniqueConstraintHandler(); + DumpMetaData ukDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_UNIQUECONSTRAINT, Long.MAX_VALUE, Long.MAX_VALUE, null, + context.hiveConf); + ukDumpMetaData.setPayload(uksString); + tasks.addAll(ukHandler.handle( + new MessageHandler.Context( + dbNameToLoadIn, null, fromPath.toString(), null, ukDumpMetaData, context.hiveConf, + context.hiveDb, null, LOG))); + + AddNotNullConstraintHandler nnHandler = new AddNotNullConstraintHandler(); + DumpMetaData nnDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_NOTNULLCONSTRAINT, Long.MAX_VALUE, Long.MAX_VALUE, null, + context.hiveConf); + nnDumpMetaData.setPayload(nnsString); + tasks.addAll(nnHandler.handle( + new MessageHandler.Context( + dbNameToLoadIn, null, fromPath.toString(), null, nnDumpMetaData, context.hiveConf, + context.hiveDb, null, LOG))); + + tasks.forEach(tracker::addTask); + return tracker; + } catch (Exception e) { + throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e); + } + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/8deb7794/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index d661f10..aa44c62 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -4203,6 +4203,48 @@ private void constructOneLBLocationMap(FileStatus fSta, throws HiveException, NoSuchObjectException { try { getMSC().dropConstraint(dbName, tableName, constraintName); + } catch (NoSuchObjectException e) { + throw e; + } catch (Exception e) { + throw new HiveException(e); + } + } + + public List<SQLPrimaryKey> getPrimaryKeyList(String dbName, String tblName) throws HiveException, NoSuchObjectException { + try { + return getMSC().getPrimaryKeys(new PrimaryKeysRequest(dbName, tblName)); + } catch (NoSuchObjectException e) { + throw e; + } catch (Exception e) { + throw new HiveException(e); + } + } + + public List<SQLForeignKey> getForeignKeyList(String dbName, String tblName) throws HiveException, NoSuchObjectException { + try { + return getMSC().getForeignKeys(new ForeignKeysRequest(null, null, dbName, tblName)); + } catch (NoSuchObjectException e) { + throw e; + } catch (Exception e) { + throw new HiveException(e); + } + } + + public List<SQLUniqueConstraint> getUniqueConstraintList(String dbName, String tblName) throws HiveException, NoSuchObjectException { + try { + return getMSC().getUniqueConstraints(new UniqueConstraintsRequest(dbName, tblName)); + } catch (NoSuchObjectException e) { + throw e; + } catch (Exception e) { + throw new HiveException(e); + } + } + + public List<SQLNotNullConstraint> getNotNullConstraintList(String dbName, String tblName) throws HiveException, NoSuchObjectException { + try { + return getMSC().getNotNullConstraints(new NotNullConstraintsRequest(dbName, tblName)); + } catch (NoSuchObjectException e) { + throw e; } catch (Exception e) { throw new HiveException(e); } http://git-wip-us.apache.org/repos/asf/hive/blob/8deb7794/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 76331fc..ece5495 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -287,7 +287,7 @@ public class EximUtil { } } - private static String readAsString(final FileSystem fs, final Path fromMetadataPath) + public static String readAsString(final FileSystem fs, final Path fromMetadataPath) throws IOException { try (FSDataInputStream stream = fs.open(fromMetadataPath)) { byte[] buffer = new byte[1024]; http://git-wip-us.apache.org/repos/asf/hive/blob/8deb7794/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 7794d3e..8f6316d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -78,6 +78,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; + public static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints"; ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); http://git-wip-us.apache.org/repos/asf/hive/blob/8deb7794/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java new file mode 100644 index 0000000..2ae9f58 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse.repl.dump.io; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; +import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +public class ConstraintsSerializer implements JsonWriter.Serializer { + private HiveConf hiveConf; + private List<SQLPrimaryKey> pks; + private List<SQLForeignKey> fks; + private List<SQLUniqueConstraint> uks; + private List<SQLNotNullConstraint> nns; + + public ConstraintsSerializer(List<SQLPrimaryKey> pks, List<SQLForeignKey> fks, + List<SQLUniqueConstraint> uks, List<SQLNotNullConstraint> nns, HiveConf hiveConf) { + this.hiveConf = hiveConf; + this.pks = pks; + this.fks = fks; + this.uks = uks; + this.nns = nns; + } + + @Override + public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) + throws SemanticException, IOException { + String pksString, fksString, uksString, nnsString; + pksString = fksString = uksString = nnsString = ""; + if (pks != null) { + pksString = MessageFactory.getInstance().buildAddPrimaryKeyMessage(pks).toString(); + } + if (fks != null) { + fksString = MessageFactory.getInstance().buildAddForeignKeyMessage(fks).toString(); + } + if (uks != null) { + uksString = MessageFactory.getInstance().buildAddUniqueConstraintMessage(uks).toString(); + } + if (nns != null) { + nnsString = MessageFactory.getInstance().buildAddNotNullConstraintMessage(nns).toString(); + } + writer.jsonGenerator.writeStringField("pks", pksString); + writer.jsonGenerator.writeStringField("fks", fksString); + writer.jsonGenerator.writeStringField("uks", uksString); + writer.jsonGenerator.writeStringField("nns", nnsString); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/8deb7794/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java index 0873c1c..0fd970a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java @@ -49,19 +49,27 @@ public class AddForeignKeyHandler extends AbstractMessageHandler { } } + List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>(); + if (fks.isEmpty()) { + return tasks; + } + String actualDbName = context.isDbNameEmpty() ? fks.get(0).getFktable_db() : context.dbName; - String actualTblName = context.isTableNameEmpty() ? fks.get(0).getPktable_name() : context.tableName; + String actualTblName = context.isTableNameEmpty() ? fks.get(0).getFktable_name() : context.tableName; for (SQLForeignKey fk : fks) { - fk.setPktable_db(actualDbName); - fk.setPktable_name(actualTblName); + // If parent table is in the same database, change it to the actual db on destination + // Otherwise, keep db name + if (fk.getPktable_db().equals(fk.getFktable_db())) { + fk.setPktable_db(actualDbName); + } fk.setFktable_db(actualDbName); + fk.setFktable_name(actualTblName); } AlterTableDesc addConstraintsDesc = new AlterTableDesc(actualDbName + "." + actualTblName, new ArrayList<SQLPrimaryKey>(), fks, new ArrayList<SQLUniqueConstraint>(), context.eventOnlyReplicationSpec()); Task<DDLWork> addConstraintsTask = TaskFactory.get(new DDLWork(readEntitySet, writeEntitySet, addConstraintsDesc), context.hiveConf); - List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>(); tasks.add(addConstraintsTask); context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName); updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); http://git-wip-us.apache.org/repos/asf/hive/blob/8deb7794/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java index 76cbe5a..9c12e7e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage; -import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -51,6 +50,11 @@ public class AddNotNullConstraintHandler extends AbstractMessageHandler { } } + List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>(); + if (nns.isEmpty()) { + return tasks; + } + String actualDbName = context.isDbNameEmpty() ? nns.get(0).getTable_db() : context.dbName; String actualTblName = context.isTableNameEmpty() ? nns.get(0).getTable_name() : context.tableName; @@ -62,7 +66,6 @@ public class AddNotNullConstraintHandler extends AbstractMessageHandler { AlterTableDesc addConstraintsDesc = new AlterTableDesc(actualDbName + "." + actualTblName, new ArrayList<SQLPrimaryKey>(), new ArrayList<SQLForeignKey>(), new ArrayList<SQLUniqueConstraint>(), nns, context.eventOnlyReplicationSpec()); Task<DDLWork> addConstraintsTask = TaskFactory.get(new DDLWork(readEntitySet, writeEntitySet, addConstraintsDesc), context.hiveConf); - List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>(); tasks.add(addConstraintsTask); context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName); updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); http://git-wip-us.apache.org/repos/asf/hive/blob/8deb7794/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java index aee46da..d7ee223 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java @@ -48,6 +48,12 @@ public class AddPrimaryKeyHandler extends AbstractMessageHandler { throw (SemanticException)e; } } + + List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>(); + if (pks.isEmpty()) { + return tasks; + } + String actualDbName = context.isDbNameEmpty() ? pks.get(0).getTable_db() : context.dbName; String actualTblName = context.isTableNameEmpty() ? pks.get(0).getTable_name() : context.tableName; @@ -59,7 +65,6 @@ public class AddPrimaryKeyHandler extends AbstractMessageHandler { AlterTableDesc addConstraintsDesc = new AlterTableDesc(actualDbName + "." + actualTblName, pks, new ArrayList<SQLForeignKey>(), new ArrayList<SQLUniqueConstraint>(), context.eventOnlyReplicationSpec()); Task<DDLWork> addConstraintsTask = TaskFactory.get(new DDLWork(readEntitySet, writeEntitySet, addConstraintsDesc), context.hiveConf); - List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>(); tasks.add(addConstraintsTask); context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName); updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); http://git-wip-us.apache.org/repos/asf/hive/blob/8deb7794/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java index f0cb11e..0d9c700 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java @@ -49,6 +49,11 @@ public class AddUniqueConstraintHandler extends AbstractMessageHandler { } } + List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>(); + if (uks.isEmpty()) { + return tasks; + } + String actualDbName = context.isDbNameEmpty() ? uks.get(0).getTable_db() : context.dbName; String actualTblName = context.isTableNameEmpty() ? uks.get(0).getTable_name() : context.tableName; @@ -60,7 +65,6 @@ public class AddUniqueConstraintHandler extends AbstractMessageHandler { AlterTableDesc addConstraintsDesc = new AlterTableDesc(actualDbName + "." + actualTblName, new ArrayList<SQLPrimaryKey>(), new ArrayList<SQLForeignKey>(), uks, context.eventOnlyReplicationSpec()); Task<DDLWork> addConstraintsTask = TaskFactory.get(new DDLWork(readEntitySet, writeEntitySet, addConstraintsDesc), context.hiveConf); - List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>(); tasks.add(addConstraintsTask); context.log.debug("Added add constrains task : {}:{}", addConstraintsTask.getId(), actualTblName); updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null);