http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index f7018c2..ac1d3c8 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -59,6 +59,7 @@ public class ReplChangeManager { static final String REMAIN_IN_TRASH_TAG = "user.remain-in-trash"; private static final String URI_FRAGMENT_SEPARATOR = "#"; public static final String SOURCE_OF_REPLICATION = "repl.source.for"; + private static final String TXN_WRITE_EVENT_FILE_SEPARATOR = "]"; public enum RecycleType { MOVE, @@ -472,7 +473,6 @@ public class ReplChangeManager { } public static boolean isSourceOfReplication(Database db) { - // Can not judge, so assuming replication is not enabled. assert (db != null); String replPolicyIds = getReplPolicyIdString(db); return !StringUtils.isEmpty(replPolicyIds); @@ -490,4 +490,12 @@ public class ReplChangeManager { } return null; } + + public static String joinWithSeparator(Iterable<?> strings) { + return org.apache.hadoop.util.StringUtils.join(TXN_WRITE_EVENT_FILE_SEPARATOR, strings); + } + + public static String[] getListFromSeparatedString(String commaSeparatedString) { + return commaSeparatedString.split("\\s*" + TXN_WRITE_EVENT_FILE_SEPARATOR + "\\s*"); + } }
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index a526019..8ff056f 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -111,6 +111,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownTableException; import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; import org.apache.hadoop.hive.metastore.api.WMMapping; import org.apache.hadoop.hive.metastore.api.WMPool; +import org.apache.hadoop.hive.metastore.api.WriteEventInfo; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; @@ -2414,6 +2415,17 @@ public class CachedStore implements RawStore, Configurable { return sharedCache.getUpdateCount(); } + @Override + public void cleanWriteNotificationEvents(int olderThan) { + rawStore.cleanWriteNotificationEvents(olderThan); + } + + + @Override + public List<WriteEventInfo> getAllWriteEventInfo(long txnId, String dbName, String tableName) throws MetaException { + return rawStore.getAllWriteEventInfo(txnId, dbName, tableName); + } + static boolean isNotInBlackList(String catName, String dbName, String tblName) { String str = TableName.getQualified(catName, dbName, tblName); for (Pattern pattern : blacklistPatterns) { http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/events/AcidWriteEvent.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/events/AcidWriteEvent.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/events/AcidWriteEvent.java new file mode 100644 index 0000000..001179a --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/events/AcidWriteEvent.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.events; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.WriteNotificationLogRequest; +import org.apache.hadoop.hive.metastore.utils.StringUtils; + +import java.util.List; + +/** + * AcidWriteEvent + * Event generated for acid write operations + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class AcidWriteEvent extends ListenerEvent { + private final WriteNotificationLogRequest writeNotificationLogRequest; + private final String partition; + private final Table tableObj; + private final Partition partitionObj; + + public AcidWriteEvent(String partition, Table tableObj, Partition partitionObj, + WriteNotificationLogRequest writeNotificationLogRequest) { + super(true, null); + this.writeNotificationLogRequest = writeNotificationLogRequest; + this.partition = partition; + this.tableObj = tableObj; + this.partitionObj = partitionObj; + } + + public Long getTxnId() { + return writeNotificationLogRequest.getTxnId(); + } + + public List<String> getFiles() { + return writeNotificationLogRequest.getFileInfo().getFilesAdded(); + } + + public List<String> getChecksums() { + return writeNotificationLogRequest.getFileInfo().getFilesAddedChecksum(); + } + + public String getDatabase() { + return StringUtils.normalizeIdentifier(writeNotificationLogRequest.getDb()); + } + + public String getTable() { + return StringUtils.normalizeIdentifier(writeNotificationLogRequest.getTable()); + } + + public String getPartition() { + return partition; //Don't normalize partition value, as its case sensitive. + } + + public Long getWriteId() { + return writeNotificationLogRequest.getWriteId(); + } + + public Table getTableObj() { + return tableObj; + } + + public Partition getPartitionObj() { + return partitionObj; + } + + public List<String> getSubDirs() { + return writeNotificationLogRequest.getFileInfo().getSubDirectoryList(); + } +} + http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/AcidWriteMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/AcidWriteMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/AcidWriteMessage.java new file mode 100644 index 0000000..e2c9ccf --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/AcidWriteMessage.java @@ -0,0 +1,50 @@ +/* * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging; + +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import java.util.List; + +/** + * HCat message sent when an ACID write is done. + */ +public abstract class AcidWriteMessage extends EventMessage { + + protected AcidWriteMessage() { + super(EventType.ACID_WRITE); + } + + public abstract Long getTxnId(); + + public abstract String getTable(); + + public abstract Long getWriteId(); + + public abstract String getPartition(); + + public abstract List<String> getFiles(); + + public abstract Table getTableObj() throws Exception; + + public abstract Partition getPartitionObj() throws Exception; + + public abstract String getTableObjStr(); + + public abstract String getPartitionObjStr(); +} http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CommitTxnMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CommitTxnMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CommitTxnMessage.java index 49004f2..9733039 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CommitTxnMessage.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CommitTxnMessage.java @@ -17,6 +17,12 @@ package org.apache.hadoop.hive.metastore.messaging; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.WriteEventInfo; + +import java.util.List; + /** * HCat message sent when an commit transaction is done. */ @@ -33,4 +39,21 @@ public abstract class CommitTxnMessage extends EventMessage { */ public abstract Long getTxnId(); + public abstract List<Long> getWriteIds(); + + public abstract List<String> getDatabases(); + + public abstract List<String> getTables(); + + public abstract List<String> getPartitions(); + + public abstract Table getTableObj(int idx) throws Exception; + + public abstract Partition getPartitionObj(int idx) throws Exception; + + public abstract String getFiles(int idx); + + public abstract List<String> getFilesList(); + + public abstract void addWriteEventInfo(List<WriteEventInfo> writeEventInfoList); } http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java index 969dd7b..f24b419 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java @@ -60,7 +60,8 @@ public abstract class EventMessage { COMMIT_TXN(MessageFactory.COMMIT_TXN_EVENT), ABORT_TXN(MessageFactory.ABORT_TXN_EVENT), ALLOC_WRITE_ID(MessageFactory.ALLOC_WRITE_ID_EVENT), - ALTER_CATALOG(MessageFactory.ALTER_CATALOG_EVENT); + ALTER_CATALOG(MessageFactory.ALTER_CATALOG_EVENT), + ACID_WRITE(MessageFactory.ACID_WRITE_EVENT); private String typeString; http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java index ca33579..b701d84 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java @@ -70,6 +70,10 @@ public abstract class MessageDeserializer { return getCommitTxnMessage(messageBody); case ABORT_TXN: return getAbortTxnMessage(messageBody); + case ALLOC_WRITE_ID: + return getAllocWriteIdMessage(messageBody); + case ACID_WRITE: + return getAcidWriteMessage(messageBody); default: throw new IllegalArgumentException("Unsupported event-type: " + eventTypeString); } @@ -186,6 +190,11 @@ public abstract class MessageDeserializer { */ public abstract AllocWriteIdMessage getAllocWriteIdMessage(String messageBody); + /* + * Method to de-serialize AcidWriteMessage instance. + */ + public abstract AcidWriteMessage getAcidWriteMessage(String messageBody); + // Protection against construction. protected MessageDeserializer() {} } http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java index e0629ea..d529147 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.events.AcidWriteEvent; import org.apache.hadoop.hive.metastore.utils.JavaUtils; import java.util.Iterator; @@ -74,6 +75,7 @@ public abstract class MessageFactory { public static final String ABORT_TXN_EVENT = "ABORT_TXN"; public static final String ALLOC_WRITE_ID_EVENT = "ALLOC_WRITE_ID_EVENT"; public static final String ALTER_CATALOG_EVENT = "ALTER_CATALOG"; + public static final String ACID_WRITE_EVENT = "ACID_WRITE_EVENT"; private static MessageFactory instance = null; @@ -326,4 +328,14 @@ public abstract class MessageFactory { public abstract DropCatalogMessage buildDropCatalogMessage(Catalog catalog); public abstract AlterCatalogMessage buildAlterCatalogMessage(Catalog oldCat, Catalog newCat); + + /** + * Factory method for building acid write message + * + * + * @param acidWriteEvent information related to the acid write operation + * @param files files added by this write operation + * @return instance of AcidWriteMessage + */ + public abstract AcidWriteMessage buildAcidWriteMessage(AcidWriteEvent acidWriteEvent, Iterator<String> files); } http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java new file mode 100644 index 0000000..515a2cb --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json; + +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.events.AcidWriteEvent; +import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage; +import org.apache.thrift.TException; +import org.codehaus.jackson.annotate.JsonProperty; +import java.util.Iterator; +import java.util.List; + +/** + * JSON implementation of AcidWriteMessage + */ +public class JSONAcidWriteMessage extends AcidWriteMessage { + + @JsonProperty + private Long txnid, writeId, timestamp; + + @JsonProperty + private String server, servicePrincipal, database, table, partition, tableObjJson, partitionObjJson; + + @JsonProperty + private List<String> files; + + /** + * Default constructor, needed for Jackson. + */ + public JSONAcidWriteMessage() { + } + + public JSONAcidWriteMessage(String server, String servicePrincipal, Long timestamp, AcidWriteEvent acidWriteEvent, + Iterator<String> files) { + this.timestamp = timestamp; + this.txnid = acidWriteEvent.getTxnId(); + this.server = server; + this.servicePrincipal = servicePrincipal; + this.database = acidWriteEvent.getDatabase(); + this.table = acidWriteEvent.getTable(); + this.writeId = acidWriteEvent.getWriteId(); + this.partition = acidWriteEvent.getPartition(); + try { + this.tableObjJson = JSONMessageFactory.createTableObjJson(acidWriteEvent.getTableObj()); + if (acidWriteEvent.getPartitionObj() != null) { + this.partitionObjJson = JSONMessageFactory.createPartitionObjJson(acidWriteEvent.getPartitionObj()); + } else { + this.partitionObjJson = null; + } + } catch (TException e) { + throw new IllegalArgumentException("Could not serialize JSONAcidWriteMessage : ", e); + } + this.files = Lists.newArrayList(files); + } + + @Override + public Long getTxnId() { + return txnid; + } + + @Override + public Long getTimestamp() { + return timestamp; + } + + @Override + public String getDB() { + return database; + } + + @Override + public String getServicePrincipal() { + return servicePrincipal; + } + + @Override + public String getServer() { + return server; + } + + @Override + public String getTable() { + return table; + } + + @Override + public Long getWriteId() { + return writeId; + } + + @Override + public String getPartition() { + return partition; + } + + @Override + public List<String> getFiles() { + return files; + } + + @Override + public Table getTableObj() throws Exception { + return (tableObjJson == null) ? null : (Table) JSONMessageFactory.getTObj(tableObjJson, Table.class); + } + + @Override + public Partition getPartitionObj() throws Exception { + return ((partitionObjJson == null) ? null : + (Partition) JSONMessageFactory.getTObj(partitionObjJson, Partition.class)); + } + + @Override + public String getTableObjStr() { + return tableObjJson; + } + + @Override + public String getPartitionObjStr() { + return partitionObjJson; + } + + @Override + public String toString() { + try { + return JSONMessageDeserializer.mapper.writeValueAsString(this); + } catch (Exception exception) { + throw new IllegalArgumentException("Could not serialize: ", exception); + } + } +} + http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java index 595a3d1..6082b8e 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java @@ -18,9 +18,15 @@ */ package org.apache.hadoop.hive.metastore.messaging.json; +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.WriteEventInfo; import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage; import org.codehaus.jackson.annotate.JsonProperty; +import java.util.List; + /** * JSON implementation of CommitTxnMessage */ @@ -38,6 +44,12 @@ public class JSONCommitTxnMessage extends CommitTxnMessage { @JsonProperty private String servicePrincipal; + @JsonProperty + private List<Long> writeIds; + + @JsonProperty + private List<String> databases, tables, partitions, tableObjs, partitionObjs, files; + /** * Default constructor, needed for Jackson. */ @@ -49,6 +61,13 @@ public class JSONCommitTxnMessage extends CommitTxnMessage { this.txnid = txnid; this.server = server; this.servicePrincipal = servicePrincipal; + this.databases = null; + this.tables = null; + this.writeIds = null; + this.partitions = null; + this.tableObjs = null; + this.partitionObjs = null; + this.files = null; } @Override @@ -77,6 +96,82 @@ public class JSONCommitTxnMessage extends CommitTxnMessage { } @Override + public List<Long> getWriteIds() { + return writeIds; + } + + @Override + public List<String> getDatabases() { + return databases; + } + + @Override + public List<String> getTables() { + return tables; + } + + @Override + public List<String> getPartitions() { + return partitions; + } + + @Override + public Table getTableObj(int idx) throws Exception { + return tableObjs == null ? null : (Table) JSONMessageFactory.getTObj(tableObjs.get(idx), Table.class); + } + + @Override + public Partition getPartitionObj(int idx) throws Exception { + return (partitionObjs == null ? null : (partitionObjs.get(idx) == null ? null : + (Partition)JSONMessageFactory.getTObj(partitionObjs.get(idx), Partition.class))); + } + + @Override + public String getFiles(int idx) { + return files == null ? null : files.get(idx); + } + + @Override + public List<String> getFilesList() { + return files; + } + + @Override + public void addWriteEventInfo(List<WriteEventInfo> writeEventInfoList) { + if (this.databases == null) { + this.databases = Lists.newArrayList(); + } + if (this.tables == null) { + this.tables = Lists.newArrayList(); + } + if (this.writeIds == null) { + this.writeIds = Lists.newArrayList(); + } + if (this.tableObjs == null) { + this.tableObjs = Lists.newArrayList(); + } + if (this.partitions == null) { + this.partitions = Lists.newArrayList(); + } + if (this.partitionObjs == null) { + this.partitionObjs = Lists.newArrayList(); + } + if (this.files == null) { + this.files = Lists.newArrayList(); + } + + for (WriteEventInfo writeEventInfo : writeEventInfoList) { + this.databases.add(writeEventInfo.getDatabase()); + this.tables.add(writeEventInfo.getTable()); + this.writeIds.add(writeEventInfo.getWriteId()); + this.partitions.add(writeEventInfo.getPartition()); + this.tableObjs.add(writeEventInfo.getTableObj()); + this.partitionObjs.add(writeEventInfo.getPartitionObj()); + this.files.add(writeEventInfo.getFiles()); + } + } + + @Override public String toString() { try { return JSONMessageDeserializer.mapper.writeValueAsString(this); http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java index f54e24d..be6b751 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage; import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage; import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage; import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage; +import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage; import org.codehaus.jackson.map.DeserializationConfig; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; @@ -259,4 +260,12 @@ public class JSONMessageDeserializer extends MessageDeserializer { throw new IllegalArgumentException("Could not construct AllocWriteIdMessage", e); } } + + public AcidWriteMessage getAcidWriteMessage(String messageBody) { + try { + return mapper.readValue(messageBody, JSONAcidWriteMessage.class); + } catch (Exception e) { + throw new IllegalArgumentException("Could not construct AcidWriteMessage", e); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java index d64c3ff..07f51f0 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hadoop.hive.metastore.events.AcidWriteEvent; import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage; import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage; import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; @@ -66,6 +67,7 @@ import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage; import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage; import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage; import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage; +import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage; import org.apache.thrift.TBase; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; @@ -230,11 +232,17 @@ public class JSONMessageFactory extends MessageFactory { return new JSONAbortTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnId, now()); } + @Override public AllocWriteIdMessage buildAllocWriteIdMessage(List<TxnToWriteId> txnToWriteIdList, String dbName, String tableName) { return new JSONAllocWriteIdMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnToWriteIdList, dbName, tableName, now()); } + @Override + public AcidWriteMessage buildAcidWriteMessage(AcidWriteEvent acidWriteEvent, Iterator<String> files) { + return new JSONAcidWriteMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, now(), acidWriteEvent, files); + } + private long now() { return System.currentTimeMillis() / 1000; } http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTxnWriteNotificationLog.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTxnWriteNotificationLog.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTxnWriteNotificationLog.java new file mode 100644 index 0000000..f5ca386 --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTxnWriteNotificationLog.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.model; + +/** + * MTxnWriteNotificationLog + * DN table for ACID write events. + */ +public class MTxnWriteNotificationLog { + private long txnId; + private long writeId; + private int eventTime; + private String database; + private String table; + private String partition; + private String tableObject; + private String partObject; + private String files; + + public MTxnWriteNotificationLog() { + } + + public MTxnWriteNotificationLog(long txnId, long writeId, int eventTime, String database, String table, + String partition, String tableObject, String partObject, String files) { + this.txnId = txnId; + this.writeId = writeId; + this.eventTime = eventTime; + this.database = database; + this.table = table; + this.partition = partition; + this.tableObject = tableObject; + this.partObject = partObject; + this.files = files; + } + + public long getTxnId() { + return txnId; + } + + public void setTxnId(long txnId) { + this.txnId = txnId; + } + + public long getWriteId() { + return writeId; + } + + public void setWriteId(long writeId) { + this.writeId = writeId; + } + + public int getEventTime() { + return eventTime; + } + + public void setEventTime(int eventTime) { + this.eventTime = eventTime; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } + + public String getPartition() { + return partition; + } + + public void setPartition(String partition) { + this.partition = partition; + } + + public String getTableObject() { + return tableObject; + } + + public void setTableObject(String tableObject) { + this.tableObject = tableObject; + } + + public String getPartObject() { + return partObject; + } + + public void setPartObject(String partObject) { + this.partObject = partObject; + } + + public String getFiles() { + return files; + } + + public void setFiles(String files) { + this.files = files; + } +} + http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java index b23a6d7..d0ac7db 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java @@ -175,4 +175,13 @@ public final class SQLGenerator { return dbProduct; } + // This is required for SQL executed directly. If the SQL has double quotes then some dbs tend to + // remove the escape characters and store the variable without double quote. + public String addEscapeCharacters(String s) { + if (dbProduct == DatabaseProduct.MYSQL) { + return s.replaceAll("\\\\", "\\\\\\\\"); + } + return s; + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index 50bfca3..f8c2ca2 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -245,6 +245,34 @@ public final class TxnDbUtil { stmt.execute("INSERT INTO \"APP\".\"NOTIFICATION_SEQUENCE\" (\"NNI_ID\", \"NEXT_EVENT_ID\")" + " SELECT * FROM (VALUES (1,1)) tmp_table WHERE NOT EXISTS ( SELECT " + "\"NEXT_EVENT_ID\" FROM \"APP\".\"NOTIFICATION_SEQUENCE\")"); + + try { + stmt.execute("CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (" + + "WNL_ID bigint NOT NULL," + + "WNL_TXNID bigint NOT NULL," + + "WNL_WRITEID bigint NOT NULL," + + "WNL_DATABASE varchar(128) NOT NULL," + + "WNL_TABLE varchar(128) NOT NULL," + + "WNL_PARTITION varchar(1024) NOT NULL," + + "WNL_TABLE_OBJ clob NOT NULL," + + "WNL_PARTITION_OBJ clob," + + "WNL_FILES clob," + + "WNL_EVENT_TIME integer NOT NULL," + + "PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION))" + ); + } catch (SQLException e) { + if (e.getMessage() != null && e.getMessage().contains("already exists")) { + LOG.info("TXN_WRITE_NOTIFICATION_LOG table already exist, ignoring"); + } else { + throw e; + } + } + + stmt.execute("INSERT INTO \"APP\".\"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\", \"NEXT_VAL\") " + + "SELECT * FROM (VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', " + + "1)) tmp_table WHERE NOT EXISTS ( SELECT \"NEXT_VAL\" FROM \"APP\"" + + ".\"SEQUENCE_TABLE\" WHERE \"SEQUENCE_NAME\" = 'org.apache.hadoop.hive.metastore" + + ".model.MTxnWriteNotificationLog')"); } catch (SQLException e) { try { conn.rollback(); http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 361ede5..3785f89 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hive.metastore.events.AbortTxnEvent; import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent; import org.apache.hadoop.hive.metastore.events.CommitTxnEvent; import org.apache.hadoop.hive.metastore.events.OpenTxnEvent; +import org.apache.hadoop.hive.metastore.events.AcidWriteEvent; import org.apache.hadoop.hive.metastore.messaging.EventMessage; import org.apache.hadoop.hive.metastore.metrics.Metrics; import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; @@ -698,6 +699,38 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { @Override @RetrySemantics.Idempotent + public long getTargetTxnId(String replPolicy, long sourceTxnId) throws MetaException { + try { + Connection dbConn = null; + Statement stmt = null; + try { + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + List<Long> targetTxnIds = getTargetTxnIdList(replPolicy, Collections.singletonList(sourceTxnId), stmt); + if (targetTxnIds.isEmpty()) { + LOG.info("Txn {} not present for repl policy {}", sourceTxnId, replPolicy); + return -1; + } + assert (targetTxnIds.size() == 1); + return targetTxnIds.get(0); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "getTargetTxnId(" + replPolicy + sourceTxnId + ")"); + throw new MetaException("Unable to get target transaction id " + + StringUtils.stringifyException(e)); + } finally { + close(null, stmt, dbConn); + unlockInternal(); + } + } catch (RetryException e) { + return getTargetTxnId(replPolicy, sourceTxnId); + } + } + + @Override + @RetrySemantics.Idempotent public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException { long txnid = rqst.getTxnid(); long sourceTxnId = -1; @@ -892,10 +925,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { shouldNeverHappen(txnid); //dbConn is rolled back in finally{} } - String conflictSQLSuffix = "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" + - quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")"; - rs = stmt.executeQuery(sqlGenerator.addLimitClause(1, "tc_operation_type " + conflictSQLSuffix)); - if (rs.next()) { + + String conflictSQLSuffix = null; + if (rqst.isSetReplPolicy()) { + rs = null; + } else { + conflictSQLSuffix = "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" + + quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")"; + rs = stmt.executeQuery(sqlGenerator.addLimitClause(1, + "tc_operation_type " + conflictSQLSuffix)); + } + if (rs != null && rs.next()) { isUpdateDelete = true; close(rs); //if here it means currently committing txn performed update/delete and we should check WW conflict @@ -984,23 +1024,52 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * Consider: if RO txn is after a W txn, then RO's openTxns() will be mutexed with W's * commitTxn() because both do S4U on NEXT_TXN_ID and thus RO will see result of W txn. * If RO < W, then there is no reads-from relationship. + * In replication flow we don't expect any write write conflict as it should have been handled at source. */ } - // Move the record from txn_components into completed_txn_components so that the compactor - // knows where to look to compact. - String s = "insert into COMPLETED_TXN_COMPONENTS (ctc_txnid, ctc_database, " + - "ctc_table, ctc_partition, ctc_writeid) select tc_txnid, tc_database, tc_table, " + - "tc_partition, tc_writeid from TXN_COMPONENTS where tc_txnid = " + txnid; - LOG.debug("Going to execute insert <" + s + ">"); - int modCount = 0; - if ((modCount = stmt.executeUpdate(s)) < 1) { - //this can be reasonable for an empty txn START/COMMIT or read-only txn - //also an IUD with DP that didn't match any rows. - LOG.info("Expected to move at least one record from txn_components to " + - "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid)); + + String s; + if (!rqst.isSetReplPolicy()) { + // Move the record from txn_components into completed_txn_components so that the compactor + // knows where to look to compact. + s = "insert into COMPLETED_TXN_COMPONENTS (ctc_txnid, ctc_database, " + + "ctc_table, ctc_partition, ctc_writeid) select tc_txnid, tc_database, tc_table, " + + "tc_partition, tc_writeid from TXN_COMPONENTS where tc_txnid = " + txnid; + LOG.debug("Going to execute insert <" + s + ">"); + + if ((stmt.executeUpdate(s)) < 1) { + //this can be reasonable for an empty txn START/COMMIT or read-only txn + //also an IUD with DP that didn't match any rows. + LOG.info("Expected to move at least one record from txn_components to " + + "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid)); + } + } else { + if (rqst.isSetWriteEventInfos()) { + List<String> rows = new ArrayList<>(); + for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) { + rows.add(txnid + "," + quoteString(writeEventInfo.getDatabase()) + "," + + quoteString(writeEventInfo.getTable()) + "," + + quoteString(writeEventInfo.getPartition()) + "," + + writeEventInfo.getWriteId()); + } + List<String> queries = sqlGenerator.createInsertValuesStmt("COMPLETED_TXN_COMPONENTS " + + "(ctc_txnid," + " ctc_database, ctc_table, ctc_partition, ctc_writeid)", rows); + for (String q : queries) { + LOG.debug("Going to execute insert <" + q + "> "); + stmt.execute(q); + } + } + + s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId + + " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy()); + LOG.info("Repl going to execute <" + s + ">"); + stmt.executeUpdate(s); } + // Obtain information that we need to update registry - s = "select ctc_database, ctc_table, ctc_writeid, ctc_timestamp from COMPLETED_TXN_COMPONENTS where ctc_txnid = " + txnid; + s = "select ctc_database, ctc_table, ctc_writeid, ctc_timestamp from COMPLETED_TXN_COMPONENTS" + + " where ctc_txnid = " + txnid; + LOG.debug("Going to extract table modification information for invalidation cache <" + s + ">"); rs = stmt.executeQuery(s); while (rs.next()) { @@ -1008,27 +1077,21 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { txnComponents.add(new TransactionRegistryInfo(rs.getString(1), rs.getString(2), rs.getLong(3), rs.getTimestamp(4, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime())); } + + // cleanup all txn related metadata s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid; LOG.debug("Going to execute update <" + s + ">"); - modCount = stmt.executeUpdate(s); + stmt.executeUpdate(s); s = "delete from HIVE_LOCKS where hl_txnid = " + txnid; LOG.debug("Going to execute update <" + s + ">"); - modCount = stmt.executeUpdate(s); + stmt.executeUpdate(s); s = "delete from TXNS where txn_id = " + txnid; LOG.debug("Going to execute update <" + s + ">"); - modCount = stmt.executeUpdate(s); + stmt.executeUpdate(s); s = "delete from MIN_HISTORY_LEVEL where mhl_txnid = " + txnid; LOG.debug("Going to execute update <" + s + ">"); - modCount = stmt.executeUpdate(s); + stmt.executeUpdate(s); LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL"); - - if (rqst.isSetReplPolicy()) { - s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId + - " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy()); - LOG.info("Repl going to execute <" + s + ">"); - stmt.executeUpdate(s); - } - if (transactionalListeners != null) { MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, null), dbConn, sqlGenerator); @@ -1548,6 +1611,43 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } @Override + @RetrySemantics.Idempotent + public void addWriteNotificationLog(AcidWriteEvent acidWriteEvent) + throws MetaException { + Connection dbConn = null; + try { + try { + //Idempotent case is handled by notify Event + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, + EventMessage.EventType.ACID_WRITE, acidWriteEvent, dbConn, sqlGenerator); + LOG.debug("Going to commit"); + dbConn.commit(); + return; + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + if (isDuplicateKeyError(e)) { + // in case of key duplicate error, retry as it might be because of race condition + if (waitForRetry("addWriteNotificationLog(" + acidWriteEvent + ")", e.getMessage())) { + throw new RetryException(); + } + retryNum = 0; + throw new MetaException(e.getMessage()); + } + checkRetryable(dbConn, e, "addWriteNotificationLog(" + acidWriteEvent + ")"); + throw new MetaException("Unable to add write notification event " + StringUtils.stringifyException(e)); + } finally{ + closeDbConn(dbConn); + unlockInternal(); + } + } catch (RetryException e) { + addWriteNotificationLog(acidWriteEvent); + } + } + + @Override @RetrySemantics.SafeToRetry public void performWriteSetGC() { Connection dbConn = null; @@ -3046,6 +3146,22 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { closeStmt(stmt); closeDbConn(dbConn); } + + private boolean waitForRetry(String caller, String errMsg) { + if (retryNum++ < retryLimit) { + LOG.warn("Retryable error detected in " + caller + ". Will wait " + retryInterval + + "ms and retry up to " + (retryLimit - retryNum + 1) + " times. Error: " + errMsg); + try { + Thread.sleep(retryInterval); + } catch (InterruptedException ex) { + // + } + return true; + } else { + LOG.error("Fatal error in " + caller + ". Retry limit (" + retryLimit + ") reached. Last error: " + errMsg); + } + return false; + } /** * Determine if an exception was such that it makes sense to retry. Unfortunately there is no standard way to do * this, so we have to inspect the error messages and catch the telltale signs for each @@ -3089,18 +3205,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } } else if (isRetryable(conf, e)) { //in MSSQL this means Communication Link Failure - if (retryNum++ < retryLimit) { - LOG.warn("Retryable error detected in " + caller + ". Will wait " + retryInterval + - "ms and retry up to " + (retryLimit - retryNum + 1) + " times. Error: " + getMessage(e)); - try { - Thread.sleep(retryInterval); - } catch (InterruptedException ex) { - // - } - sendRetrySignal = true; - } else { - LOG.error("Fatal error in " + caller + ". Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e)); - } + sendRetrySignal = waitForRetry(caller, e.getMessage()); } else { //make sure we know we saw an error that we don't recognize http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index ef447e1..d972d10 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.common.classification.RetrySemantics; import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.events.AcidWriteEvent; import java.sql.SQLException; import java.util.Iterator; @@ -86,6 +87,9 @@ public interface TxnStore extends Configurable { @RetrySemantics.Idempotent OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException; + @RetrySemantics.Idempotent + long getTargetTxnId(String replPolicy, long sourceTxnId) throws MetaException; + /** * Abort (rollback) a transaction. * @param rqst info on transaction to abort @@ -476,4 +480,11 @@ public interface TxnStore extends Configurable { */ @RetrySemantics.Idempotent void setHadoopJobId(String hadoopJobId, long id); + + /** + * Add the ACID write event information to writeNotificationLog table. + * @param acidWriteEvent + */ + @RetrySemantics.Idempotent + void addWriteNotificationLog(AcidWriteEvent acidWriteEvent) throws MetaException; } http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java index 963e12f..154db4b 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java @@ -511,7 +511,6 @@ public class FileUtils { return new Path(scheme, authority, pathUri.getPath()); } - /** * Returns a BEST GUESS as to whether or not other is a subdirectory of parent. It does not * take into account any intricacies of the underlying file system, which is assumed to be @@ -524,4 +523,15 @@ public class FileUtils { public static boolean isSubdirectory(String parent, String other) { return other.startsWith(parent.endsWith(Path.SEPARATOR) ? parent : parent + Path.SEPARATOR); } + + public static Path getTransformedPath(String name, String subDir, String root) { + if (root != null) { + Path newPath = new Path(root); + if (subDir != null) { + newPath = new Path(newPath, subDir); + } + return new Path(newPath, name); + } + return null; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/resources/package.jdo ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/resources/package.jdo b/standalone-metastore/src/main/resources/package.jdo index 1be3e98..5fb548c 100644 --- a/standalone-metastore/src/main/resources/package.jdo +++ b/standalone-metastore/src/main/resources/package.jdo @@ -1182,6 +1182,41 @@ </field> </class> + <class name="MTxnWriteNotificationLog" table="TXN_WRITE_NOTIFICATION_LOG" identity-type="datastore" detachable="true"> + <datastore-identity strategy="increment"/> + <datastore-identity key-cache-size="1"/> + <datastore-identity> + <column name="WNL_ID"/> + </datastore-identity> + <field name="txnId"> + <column name="WNL_TXNID" jdbc-type="BIGINT" allows-null="false"/> + </field> + <field name="writeId"> + <column name="WNL_WRITEID" jdbc-type="BIGINT" allows-null="false"/> + </field> + <field name="database"> + <column name="WNL_DATABASE" length="128" jdbc-type="VARCHAR" allows-null="false"/> + </field> + <field name="table"> + <column name="WNL_TABLE" length="128" jdbc-type="VARCHAR" allows-null="false"/> + </field> + <field name="partition"> + <column name="WNL_PARTITION" length="1024" jdbc-type="VARCHAR" allows-null="false"/> + </field> + <field name="tableObject"> + <column name="WNL_TABLE_OBJ" jdbc-type="LONGVARCHAR"/> + </field> + <field name="partObject"> + <column name="WNL_PARTITION_OBJ" jdbc-type="LONGVARCHAR"/> + </field> + <field name="files"> + <column name="WNL_FILES" jdbc-type="LONGVARCHAR"/> + </field> + <field name="eventTime"> + <column name="WNL_EVENT_TIME" jdbc-type="INTEGER" allows-null="false"/> + </field> + </class> + <class name="MWMResourcePlan" identity-type="datastore" table="WM_RESOURCEPLAN" detachable="true"> <datastore-identity> <column name="RP_ID"/> http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/derby/hive-schema-3.1.0.derby.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/derby/hive-schema-3.1.0.derby.sql b/standalone-metastore/src/main/sql/derby/hive-schema-3.1.0.derby.sql index 352b43e..a696d06 100644 --- a/standalone-metastore/src/main/sql/derby/hive-schema-3.1.0.derby.sql +++ b/standalone-metastore/src/main/sql/derby/hive-schema-3.1.0.derby.sql @@ -689,6 +689,21 @@ CREATE TABLE "APP"."RUNTIME_STATS" ( CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME); +CREATE TABLE TXN_WRITE_NOTIFICATION_LOG ( + WNL_ID bigint NOT NULL, + WNL_TXNID bigint NOT NULL, + WNL_WRITEID bigint NOT NULL, + WNL_DATABASE varchar(128) NOT NULL, + WNL_TABLE varchar(128) NOT NULL, + WNL_PARTITION varchar(1024) NOT NULL, + WNL_TABLE_OBJ clob NOT NULL, + WNL_PARTITION_OBJ clob, + WNL_FILES clob, + WNL_EVENT_TIME integer NOT NULL, + PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION) +); +INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1); + -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script -- ----------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/derby/hive-schema-4.0.0.derby.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/derby/hive-schema-4.0.0.derby.sql b/standalone-metastore/src/main/sql/derby/hive-schema-4.0.0.derby.sql index bb69105..7cab4fb 100644 --- a/standalone-metastore/src/main/sql/derby/hive-schema-4.0.0.derby.sql +++ b/standalone-metastore/src/main/sql/derby/hive-schema-4.0.0.derby.sql @@ -689,6 +689,21 @@ CREATE TABLE "APP"."RUNTIME_STATS" ( CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME); +CREATE TABLE TXN_WRITE_NOTIFICATION_LOG ( + WNL_ID bigint NOT NULL, + WNL_TXNID bigint NOT NULL, + WNL_WRITEID bigint NOT NULL, + WNL_DATABASE varchar(128) NOT NULL, + WNL_TABLE varchar(128) NOT NULL, + WNL_PARTITION varchar(1024) NOT NULL, + WNL_TABLE_OBJ clob NOT NULL, + WNL_PARTITION_OBJ clob, + WNL_FILES clob, + WNL_EVENT_TIME integer NOT NULL, + PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION) +); +INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1); + -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script -- ----------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql b/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql index 7b7a8a2..10f1373 100644 --- a/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql +++ b/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql @@ -244,7 +244,6 @@ CREATE TABLE MIN_HISTORY_LEVEL ( CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID); - CREATE TABLE "APP"."RUNTIME_STATS" ( "RS_ID" bigint primary key, "CREATE_TIME" integer not null, http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/derby/upgrade-3.0.0-to-3.1.0.derby.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/derby/upgrade-3.0.0-to-3.1.0.derby.sql b/standalone-metastore/src/main/sql/derby/upgrade-3.0.0-to-3.1.0.derby.sql index 6621ef7..7058ab0 100644 --- a/standalone-metastore/src/main/sql/derby/upgrade-3.0.0-to-3.1.0.derby.sql +++ b/standalone-metastore/src/main/sql/derby/upgrade-3.0.0-to-3.1.0.derby.sql @@ -29,5 +29,21 @@ ALTER TABLE TXNS ADD COLUMN TXN_TYPE integer; CREATE INDEX "APP"."TAB_COL_STATS_IDX" ON "APP"."TAB_COL_STATS" ("CAT_NAME", "DB_NAME", "TABLE_NAME", "COLUMN_NAME"); +-- HIVE-19267 +CREATE TABLE TXN_WRITE_NOTIFICATION_LOG ( + WNL_ID bigint NOT NULL, + WNL_TXNID bigint NOT NULL, + WNL_WRITEID bigint NOT NULL, + WNL_DATABASE varchar(128) NOT NULL, + WNL_TABLE varchar(128) NOT NULL, + WNL_PARTITION varchar(1024) NOT NULL, + WNL_TABLE_OBJ clob NOT NULL, + WNL_PARTITION_OBJ clob, + WNL_FILES clob, + WNL_EVENT_TIME integer NOT NULL, + PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION) +); +INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1); + -- This needs to be the last thing done. Insert any changes above this line. UPDATE "APP".VERSION SET SCHEMA_VERSION='3.1.0', VERSION_COMMENT='Hive release version 3.1.0' where VER_ID=1; http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/mssql/hive-schema-3.1.0.mssql.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/mssql/hive-schema-3.1.0.mssql.sql b/standalone-metastore/src/main/sql/mssql/hive-schema-3.1.0.mssql.sql index bc11b40..d7722dc 100644 --- a/standalone-metastore/src/main/sql/mssql/hive-schema-3.1.0.mssql.sql +++ b/standalone-metastore/src/main/sql/mssql/hive-schema-3.1.0.mssql.sql @@ -1248,6 +1248,23 @@ CREATE TABLE RUNTIME_STATS ( CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME); +CREATE TABLE TXN_WRITE_NOTIFICATION_LOG ( + WNL_ID bigint NOT NULL, + WNL_TXNID bigint NOT NULL, + WNL_WRITEID bigint NOT NULL, + WNL_DATABASE nvarchar(128) NOT NULL, + WNL_TABLE nvarchar(128) NOT NULL, + WNL_PARTITION nvarchar(1024) NOT NULL, + WNL_TABLE_OBJ text NOT NULL, + WNL_PARTITION_OBJ text, + WNL_FILES text, + WNL_EVENT_TIME int NOT NULL +); + +ALTER TABLE TXN_WRITE_NOTIFICATION_LOG ADD CONSTRAINT TXN_WRITE_NOTIFICATION_LOG_PK PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION); + +INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1); + -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script -- ----------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql b/standalone-metastore/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql index 922e8fe..a81fc40 100644 --- a/standalone-metastore/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql +++ b/standalone-metastore/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql @@ -1249,6 +1249,23 @@ CREATE TABLE RUNTIME_STATS ( CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME); +CREATE TABLE TXN_WRITE_NOTIFICATION_LOG ( + WNL_ID bigint NOT NULL, + WNL_TXNID bigint NOT NULL, + WNL_WRITEID bigint NOT NULL, + WNL_DATABASE nvarchar(128) NOT NULL, + WNL_TABLE nvarchar(128) NOT NULL, + WNL_PARTITION nvarchar(1024) NOT NULL, + WNL_TABLE_OBJ text NOT NULL, + WNL_PARTITION_OBJ text, + WNL_FILES text, + WNL_EVENT_TIME int NOT NULL +); + +ALTER TABLE TXN_WRITE_NOTIFICATION_LOG ADD CONSTRAINT TXN_WRITE_NOTIFICATION_LOG_PK PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION); + +INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1); + -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script -- ----------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/mssql/upgrade-3.0.0-to-3.1.0.mssql.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/mssql/upgrade-3.0.0-to-3.1.0.mssql.sql b/standalone-metastore/src/main/sql/mssql/upgrade-3.0.0-to-3.1.0.mssql.sql index abb80d6..41f23f7 100644 --- a/standalone-metastore/src/main/sql/mssql/upgrade-3.0.0-to-3.1.0.mssql.sql +++ b/standalone-metastore/src/main/sql/mssql/upgrade-3.0.0-to-3.1.0.mssql.sql @@ -30,6 +30,22 @@ ALTER TABLE TXNS ADD TXN_TYPE int NULL; CREATE INDEX TAB_COL_STATS_IDX ON TAB_COL_STATS (CAT_NAME, DB_NAME, TABLE_NAME, COLUMN_NAME); +-- HIVE-19267 +CREATE TABLE TXN_WRITE_NOTIFICATION_LOG ( + WNL_ID bigint NOT NULL, + WNL_TXNID bigint NOT NULL, + WNL_WRITEID bigint NOT NULL, + WNL_DATABASE nvarchar(128) NOT NULL, + WNL_TABLE nvarchar(128) NOT NULL, + WNL_PARTITION nvarchar(1024) NOT NULL, + WNL_TABLE_OBJ text NOT NULL, + WNL_PARTITION_OBJ text, + WNL_FILES text, + WNL_EVENT_TIME int NOT NULL +); +ALTER TABLE TXN_WRITE_NOTIFICATION_LOG ADD CONSTRAINT TXN_WRITE_NOTIFICATION_LOG_PK PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION); +INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1); + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='3.1.0', VERSION_COMMENT='Hive release version 3.1.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.0.0 to 3.1.0' AS MESSAGE; http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql index c54df55..c65af1e 100644 --- a/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql +++ b/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql @@ -1155,7 +1155,6 @@ CREATE TABLE REPL_TXN_MAP ( PRIMARY KEY (RTM_REPL_POLICY, RTM_SRC_TXN_ID) ) ENGINE=InnoDB DEFAULT CHARSET=latin1; - CREATE TABLE RUNTIME_STATS ( RS_ID bigint primary key, CREATE_TIME bigint NOT NULL, http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/mysql/hive-schema-3.1.0.mysql.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/mysql/hive-schema-3.1.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/hive-schema-3.1.0.mysql.sql index af955dc..29d4a43 100644 --- a/standalone-metastore/src/main/sql/mysql/hive-schema-3.1.0.mysql.sql +++ b/standalone-metastore/src/main/sql/mysql/hive-schema-3.1.0.mysql.sql @@ -1173,6 +1173,22 @@ CREATE TABLE RUNTIME_STATS ( CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME); +CREATE TABLE TXN_WRITE_NOTIFICATION_LOG ( + WNL_ID bigint NOT NULL, + WNL_TXNID bigint NOT NULL, + WNL_WRITEID bigint NOT NULL, + WNL_DATABASE varchar(128) NOT NULL, + WNL_TABLE varchar(128) NOT NULL, + WNL_PARTITION varchar(1024) NOT NULL, + WNL_TABLE_OBJ longtext NOT NULL, + WNL_PARTITION_OBJ longtext, + WNL_FILES longtext, + WNL_EVENT_TIME INT(11) NOT NULL, + PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +INSERT INTO `SEQUENCE_TABLE` (`SEQUENCE_NAME`, `NEXT_VAL`) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1); + -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script -- ----------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql index 6c40e6e..968f4a4 100644 --- a/standalone-metastore/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql +++ b/standalone-metastore/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql @@ -1173,6 +1173,22 @@ CREATE TABLE RUNTIME_STATS ( CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME); +CREATE TABLE TXN_WRITE_NOTIFICATION_LOG ( + WNL_ID bigint NOT NULL, + WNL_TXNID bigint NOT NULL, + WNL_WRITEID bigint NOT NULL, + WNL_DATABASE varchar(128) NOT NULL, + WNL_TABLE varchar(128) NOT NULL, + WNL_PARTITION varchar(1024) NOT NULL, + WNL_TABLE_OBJ longtext NOT NULL, + WNL_PARTITION_OBJ longtext, + WNL_FILES longtext, + WNL_EVENT_TIME INT(11) NOT NULL, + PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +INSERT INTO `SEQUENCE_TABLE` (`SEQUENCE_NAME`, `NEXT_VAL`) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1); + -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script -- ----------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql index 9b87563..786e38a 100644 --- a/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql +++ b/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql @@ -319,8 +319,8 @@ UPDATE COMPLETED_TXN_COMPONENTS SET CTC_WRITEID = CTC_TXNID; ALTER TABLE TXN_COMPONENTS MODIFY COLUMN TC_TABLE varchar(128) NULL; +ALTER TABLE `TBLS` ADD COLUMN `OWNER_TYPE` VARCHAR(10) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL; + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='3.0.0', VERSION_COMMENT='Hive release version 3.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 2.3.0 to 3.0.0' AS ' '; - -ALTER TABLE `TBLS` ADD COLUMN `OWNER_TYPE` VARCHAR(10) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/mysql/upgrade-3.0.0-to-3.1.0.mysql.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/mysql/upgrade-3.0.0-to-3.1.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/upgrade-3.0.0-to-3.1.0.mysql.sql index 305fa1d..e103bef 100644 --- a/standalone-metastore/src/main/sql/mysql/upgrade-3.0.0-to-3.1.0.mysql.sql +++ b/standalone-metastore/src/main/sql/mysql/upgrade-3.0.0-to-3.1.0.mysql.sql @@ -30,6 +30,22 @@ ALTER TABLE TXNS ADD COLUMN TXN_TYPE int DEFAULT NULL; CREATE INDEX TAB_COL_STATS_IDX ON TAB_COL_STATS (CAT_NAME, DB_NAME, TABLE_NAME, COLUMN_NAME) USING BTREE; +-- HIVE-19267 +CREATE TABLE TXN_WRITE_NOTIFICATION_LOG ( + WNL_ID bigint NOT NULL, + WNL_TXNID bigint NOT NULL, + WNL_WRITEID bigint NOT NULL, + WNL_DATABASE varchar(128) NOT NULL, + WNL_TABLE varchar(128) NOT NULL, + WNL_PARTITION varchar(1024) NOT NULL, + WNL_TABLE_OBJ longtext NOT NULL, + WNL_PARTITION_OBJ longtext, + WNL_FILES longtext, + WNL_EVENT_TIME INT(11) NOT NULL, + PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; +INSERT INTO `SEQUENCE_TABLE` (`SEQUENCE_NAME`, `NEXT_VAL`) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1); + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='3.1.0', VERSION_COMMENT='Hive release version 3.1.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.0.0 to 3.1.0' AS ' '; http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql index 63cc1f7..3e2e282 100644 --- a/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql +++ b/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql @@ -1134,7 +1134,6 @@ CREATE TABLE RUNTIME_STATS ( CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME); - -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script -- ----------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/oracle/hive-schema-3.1.0.oracle.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/oracle/hive-schema-3.1.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/hive-schema-3.1.0.oracle.sql index bc13703..9adea31 100644 --- a/standalone-metastore/src/main/sql/oracle/hive-schema-3.1.0.oracle.sql +++ b/standalone-metastore/src/main/sql/oracle/hive-schema-3.1.0.oracle.sql @@ -1143,6 +1143,21 @@ CREATE TABLE RUNTIME_STATS ( CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME); +CREATE TABLE TXN_WRITE_NOTIFICATION_LOG ( + WNL_ID number(19) NOT NULL, + WNL_TXNID number(19) NOT NULL, + WNL_WRITEID number(19) NOT NULL, + WNL_DATABASE varchar(128) NOT NULL, + WNL_TABLE varchar(128) NOT NULL, + WNL_PARTITION varchar(1024) NOT NULL, + WNL_TABLE_OBJ clob NOT NULL, + WNL_PARTITION_OBJ clob, + WNL_FILES clob, + WNL_EVENT_TIME number(10) NOT NULL, + PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION) +); + +INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1); -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql index e12150a..faca669 100644 --- a/standalone-metastore/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql +++ b/standalone-metastore/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql @@ -1143,6 +1143,21 @@ CREATE TABLE RUNTIME_STATS ( CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME); +CREATE TABLE TXN_WRITE_NOTIFICATION_LOG ( + WNL_ID number(19) NOT NULL, + WNL_TXNID number(19) NOT NULL, + WNL_WRITEID number(19) NOT NULL, + WNL_DATABASE varchar(128) NOT NULL, + WNL_TABLE varchar(128) NOT NULL, + WNL_PARTITION varchar(1024) NOT NULL, + WNL_TABLE_OBJ clob NOT NULL, + WNL_PARTITION_OBJ clob, + WNL_FILES clob, + WNL_EVENT_TIME number(10) NOT NULL, + PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION) +); + +INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1); -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql index ce3437f..71f5034 100644 --- a/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql +++ b/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql @@ -335,8 +335,8 @@ INSERT INTO TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID, T2W_WRITEID) UPDATE TXN_COMPONENTS SET TC_WRITEID = TC_TXNID; UPDATE COMPLETED_TXN_COMPONENTS SET CTC_WRITEID = CTC_TXNID; +ALTER TABLE TBLS ADD OWNER_TYPE VARCHAR2(10) NULL; + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='3.0.0', VERSION_COMMENT='Hive release version 3.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 2.3.0 to 3.0.0' AS Status from dual; - -ALTER TABLE TBLS ADD OWNER_TYPE VARCHAR2(10) NULL; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/oracle/upgrade-3.0.0-to-3.1.0.oracle.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/oracle/upgrade-3.0.0-to-3.1.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/upgrade-3.0.0-to-3.1.0.oracle.sql index ccdea54..cf8699b 100644 --- a/standalone-metastore/src/main/sql/oracle/upgrade-3.0.0-to-3.1.0.oracle.sql +++ b/standalone-metastore/src/main/sql/oracle/upgrade-3.0.0-to-3.1.0.oracle.sql @@ -30,6 +30,22 @@ ALTER TABLE TXNS ADD TXN_TYPE number(10) NULL; CREATE INDEX TAB_COL_STATS_IDX ON TAB_COL_STATS (CAT_NAME, DB_NAME, TABLE_NAME, COLUMN_NAME); +-- HIVE-19267 +CREATE TABLE TXN_WRITE_NOTIFICATION_LOG ( + WNL_ID number(19) NOT NULL, + WNL_TXNID number(19) NOT NULL, + WNL_WRITEID number(19) NOT NULL, + WNL_DATABASE varchar(128) NOT NULL, + WNL_TABLE varchar(128) NOT NULL, + WNL_PARTITION varchar(1024) NOT NULL, + WNL_TABLE_OBJ clob NOT NULL, + WNL_PARTITION_OBJ clob, + WNL_FILES clob, + WNL_EVENT_TIME number(10) NOT NULL, + PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION) +); +INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1); + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='3.1.0', VERSION_COMMENT='Hive release version 3.1.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.0.0 to 3.1.0' AS Status from dual; http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql b/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql index 97697f8..b89c87f 100644 --- a/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql +++ b/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql @@ -1812,7 +1812,6 @@ CREATE TABLE REPL_TXN_MAP ( PRIMARY KEY (RTM_REPL_POLICY, RTM_SRC_TXN_ID) ); - CREATE TABLE RUNTIME_STATS ( RS_ID bigint primary key, CREATE_TIME bigint NOT NULL, @@ -1822,7 +1821,6 @@ CREATE TABLE RUNTIME_STATS ( CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME); - -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script -- ----------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/postgres/hive-schema-3.1.0.postgres.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/postgres/hive-schema-3.1.0.postgres.sql b/standalone-metastore/src/main/sql/postgres/hive-schema-3.1.0.postgres.sql index 36bab70..7a8a419 100644 --- a/standalone-metastore/src/main/sql/postgres/hive-schema-3.1.0.postgres.sql +++ b/standalone-metastore/src/main/sql/postgres/hive-schema-3.1.0.postgres.sql @@ -1834,6 +1834,21 @@ CREATE TABLE RUNTIME_STATS ( CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME); +CREATE TABLE "TXN_WRITE_NOTIFICATION_LOG" ( + "WNL_ID" bigint NOT NULL, + "WNL_TXNID" bigint NOT NULL, + "WNL_WRITEID" bigint NOT NULL, + "WNL_DATABASE" varchar(128) NOT NULL, + "WNL_TABLE" varchar(128) NOT NULL, + "WNL_PARTITION" varchar(1024) NOT NULL, + "WNL_TABLE_OBJ" text NOT NULL, + "WNL_PARTITION_OBJ" text, + "WNL_FILES" text, + "WNL_EVENT_TIME" integer NOT NULL, + PRIMARY KEY ("WNL_TXNID", "WNL_DATABASE", "WNL_TABLE", "WNL_PARTITION") +); + +INSERT INTO "SEQUENCE_TABLE" ("SEQUENCE_NAME", "NEXT_VAL") VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1); -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql b/standalone-metastore/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql index b73e1d1..2e7ac5a 100644 --- a/standalone-metastore/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql +++ b/standalone-metastore/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql @@ -1836,6 +1836,21 @@ CREATE TABLE RUNTIME_STATS ( CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME); +CREATE TABLE "TXN_WRITE_NOTIFICATION_LOG" ( + "WNL_ID" bigint NOT NULL, + "WNL_TXNID" bigint NOT NULL, + "WNL_WRITEID" bigint NOT NULL, + "WNL_DATABASE" varchar(128) NOT NULL, + "WNL_TABLE" varchar(128) NOT NULL, + "WNL_PARTITION" varchar(1024) NOT NULL, + "WNL_TABLE_OBJ" text NOT NULL, + "WNL_PARTITION_OBJ" text, + "WNL_FILES" text, + "WNL_EVENT_TIME" integer NOT NULL, + PRIMARY KEY ("WNL_TXNID", "WNL_DATABASE", "WNL_TABLE", "WNL_PARTITION") +); + +INSERT INTO "SEQUENCE_TABLE" ("SEQUENCE_NAME", "NEXT_VAL") VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1); -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/postgres/upgrade-3.0.0-to-3.1.0.postgres.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/postgres/upgrade-3.0.0-to-3.1.0.postgres.sql b/standalone-metastore/src/main/sql/postgres/upgrade-3.0.0-to-3.1.0.postgres.sql index 2c0eb31..445c3a2 100644 --- a/standalone-metastore/src/main/sql/postgres/upgrade-3.0.0-to-3.1.0.postgres.sql +++ b/standalone-metastore/src/main/sql/postgres/upgrade-3.0.0-to-3.1.0.postgres.sql @@ -32,6 +32,22 @@ ALTER TABLE TXNS ADD COLUMN TXN_TYPE integer DEFAULT NULL; CREATE INDEX "TAB_COL_STATS_IDX" ON "TAB_COL_STATS" USING btree ("CAT_NAME", "DB_NAME","TABLE_NAME","COLUMN_NAME"); +-- HIVE-19267 +CREATE TABLE "TXN_WRITE_NOTIFICATION_LOG" ( + "WNL_ID" bigint NOT NULL, + "WNL_TXNID" bigint NOT NULL, + "WNL_WRITEID" bigint NOT NULL, + "WNL_DATABASE" varchar(128) NOT NULL, + "WNL_TABLE" varchar(128) NOT NULL, + "WNL_PARTITION" varchar(1024) NOT NULL, + "WNL_TABLE_OBJ" text NOT NULL, + "WNL_PARTITION_OBJ" text, + "WNL_FILES" text, + "WNL_EVENT_TIME" integer NOT NULL, + PRIMARY KEY ("WNL_TXNID", "WNL_DATABASE", "WNL_TABLE", "WNL_PARTITION") +); +INSERT INTO "SEQUENCE_TABLE" ("SEQUENCE_NAME", "NEXT_VAL") VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1); + -- These lines need to be last. Insert any changes above. UPDATE "VERSION" SET "SCHEMA_VERSION"='3.1.0', "VERSION_COMMENT"='Hive release version 3.1.0' where "VER_ID"=1; SELECT 'Finished upgrading MetaStore schema from 3.0.0 to 3.1.0'; http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/thrift/hive_metastore.thrift ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/thrift/hive_metastore.thrift b/standalone-metastore/src/main/thrift/hive_metastore.thrift index 6e503eb..1ca6454 100644 --- a/standalone-metastore/src/main/thrift/hive_metastore.thrift +++ b/standalone-metastore/src/main/thrift/hive_metastore.thrift @@ -867,6 +867,18 @@ struct AbortTxnsRequest { struct CommitTxnRequest { 1: required i64 txnid, 2: optional string replPolicy, + // Information related to write operations done in this transaction. + 3: optional list<WriteEventInfo> writeEventInfos, +} + +struct WriteEventInfo { + 1: required i64 writeId, + 2: required string database, + 3: required string table, + 4: required string files, + 5: optional string partition, + 6: optional string tableObj, // repl txn task does not need table object for commit + 7: optional string partitionObj, } struct ReplTblWriteIdStateRequest { @@ -1102,6 +1114,8 @@ struct InsertEventRequestData { 2: required list<string> filesAdded, // Checksum of files (hex string of checksum byte payload) 3: optional list<string> filesAddedChecksum, + // Used by acid operation to create the sub directory + 4: optional list<string> subDirectoryList, } union FireEventRequestData { @@ -1122,7 +1136,20 @@ struct FireEventRequest { struct FireEventResponse { // NOP for now, this is just a place holder for future responses } - + +struct WriteNotificationLogRequest { + 1: required i64 txnId, + 2: required i64 writeId, + 3: required string db, + 4: required string table, + 5: required InsertEventRequestData fileInfo, + 6: optional list<string> partitionVals, +} + +struct WriteNotificationLogResponse { + // NOP for now, this is just a place holder for future responses +} + struct MetadataPpdResult { 1: optional binary metadata, 2: optional binary includeBitset @@ -2104,6 +2131,7 @@ service ThriftHiveMetastore extends fb303.FacebookService NotificationEventsCountResponse get_notification_events_count(1:NotificationEventsCountRequest rqst) FireEventResponse fire_listener_event(1:FireEventRequest rqst) void flushCache() + WriteNotificationLogResponse add_write_notification_log(WriteNotificationLogRequest rqst) // Repl Change Management api CmRecycleResponse cm_recycle(1:CmRecycleRequest request) throws(1:MetaException o1) http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java index c482d28..2454479 100644 --- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java @@ -85,6 +85,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; import org.apache.hadoop.hive.metastore.api.UnknownTableException; import org.apache.hadoop.hive.metastore.api.WMMapping; import org.apache.hadoop.hive.metastore.api.WMPool; +import org.apache.hadoop.hive.metastore.api.WriteEventInfo; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.thrift.TException; @@ -1195,6 +1196,16 @@ public class DummyRawStoreControlledCommit implements RawStore, Configurable { } @Override + public void cleanWriteNotificationEvents(int olderThan) { + objectStore.cleanWriteNotificationEvents(olderThan); + } + + @Override + public List<WriteEventInfo> getAllWriteEventInfo(long txnId, String dbName, String tableName) throws MetaException { + return objectStore.getAllWriteEventInfo(txnId, dbName, tableName); + } + + @Override public List<TableName> getTableNamesWithStats() throws MetaException, NoSuchObjectException { return null; http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java index d253005..9b79446 100644 --- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java @@ -83,6 +83,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; import org.apache.hadoop.hive.metastore.api.UnknownTableException; import org.apache.hadoop.hive.metastore.api.WMMapping; import org.apache.hadoop.hive.metastore.api.WMPool; +import org.apache.hadoop.hive.metastore.api.WriteEventInfo; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; @@ -1199,4 +1200,13 @@ public class DummyRawStoreForJdoConnection implements RawStore { NoSuchObjectException { return null; } + + @Override + public void cleanWriteNotificationEvents(int olderThan) { + } + + @Override + public List<WriteEventInfo> getAllWriteEventInfo(long txnId, String dbName, String tableName) throws MetaException { + return null; + } }