Repository: hive Updated Branches: refs/heads/branch-2.1 e1cec964d -> 39ecc205e
HIVE-13392 disable speculative execution for ACID Compactor (Eugene Koifman, reviewed by Alan Gates) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/39ecc205 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/39ecc205 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/39ecc205 Branch: refs/heads/branch-2.1 Commit: 39ecc205e64cd1808bebec3ae1dc448e01c48680 Parents: e1cec96 Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Fri Jul 8 13:17:29 2016 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Fri Jul 8 14:24:28 2016 -0700 ---------------------------------------------------------------------- .../hive/common/ValidCompactorTxnList.java | 111 +++++++++++++++++++ .../hive/metastore/txn/CompactionInfo.java | 1 + .../hadoop/hive/metastore/txn/TxnUtils.java | 1 + .../metastore/txn/ValidCompactorTxnList.java | 111 ------------------- .../txn/TestValidCompactorTxnList.java | 1 + .../hive/ql/txn/compactor/CompactorMR.java | 8 +- .../apache/hadoop/hive/ql/io/TestAcidUtils.java | 2 +- 7 files changed, 121 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/39ecc205/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java new file mode 100644 index 0000000..ad79e2c --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hive.common.ValidReadTxnList; + +import java.util.Arrays; + +/** + * And implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor. + * For the purposes of {@link #isTxnRangeValid} this class will view a transaction as valid if it + * is committed or aborted. Additionally it will return none if there are any open transactions + * below the max transaction given, since we don't want to compact above open transactions. For + * {@link #isTxnValid} it will still view a transaction as valid only if it is committed. These + * produce the logic we need to assure that the compactor only sees records less than the lowest + * open transaction when choosing which files to compact, but that it still ignores aborted + * records when compacting. + */ +public class ValidCompactorTxnList extends ValidReadTxnList { + //TODO: refactor this - minOpenTxn is not needed if we set + // highWatermark = Math.min(highWaterMark, minOpenTxn) (assuming there are open txns) + + // The minimum open transaction id + private long minOpenTxn; + + public ValidCompactorTxnList() { + super(); + minOpenTxn = -1; + } + + /** + * + * @param exceptions list of all open and aborted transactions + * @param minOpen lowest open transaction + * @param highWatermark highest committed transaction + */ + public ValidCompactorTxnList(long[] exceptions, long minOpen, long highWatermark) { + super(exceptions, highWatermark); + minOpenTxn = minOpen; + } + + public ValidCompactorTxnList(String value) { + super(value); + } + + @Override + public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) { + if (highWatermark < minTxnId) { + return RangeResponse.NONE; + } else if (minOpenTxn < 0) { + return highWatermark >= maxTxnId ? RangeResponse.ALL : RangeResponse.NONE; + } else { + return minOpenTxn > maxTxnId ? RangeResponse.ALL : RangeResponse.NONE; + } + } + + @Override + public String writeToString() { + StringBuilder buf = new StringBuilder(); + buf.append(highWatermark); + buf.append(':'); + buf.append(minOpenTxn); + if (exceptions.length == 0) { + buf.append(':'); + } else { + for(long except: exceptions) { + buf.append(':'); + buf.append(except); + } + } + return buf.toString(); + } + + @Override + public void readFromString(String src) { + if (src == null || src.length() == 0) { + highWatermark = Long.MAX_VALUE; + exceptions = new long[0]; + } else { + String[] values = src.split(":"); + highWatermark = Long.parseLong(values[0]); + minOpenTxn = Long.parseLong(values[1]); + exceptions = new long[values.length - 2]; + for(int i = 2; i < values.length; ++i) { + exceptions[i-2] = Long.parseLong(values[i]); + } + } + } + + @VisibleForTesting + public long getMinOpenTxn() { + return minOpenTxn; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/39ecc205/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java index 85e0885..413ce3b 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.metastore.txn; +import org.apache.hadoop.hive.common.ValidCompactorTxnList; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; http://git-wip-us.apache.org/repos/asf/hive/blob/39ecc205/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index 46348ea..39b18ac 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.metastore.txn; +import org.apache.hadoop.hive.common.ValidCompactorTxnList; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; http://git-wip-us.apache.org/repos/asf/hive/blob/39ecc205/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java deleted file mode 100644 index 30bdfa7..0000000 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.metastore.txn; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hive.common.ValidReadTxnList; - -import java.util.Arrays; - -/** - * And implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor. - * For the purposes of {@link #isTxnRangeValid} this class will view a transaction as valid if it - * is committed or aborted. Additionally it will return none if there are any open transactions - * below the max transaction given, since we don't want to compact above open transactions. For - * {@link #isTxnValid} it will still view a transaction as valid only if it is committed. These - * produce the logic we need to assure that the compactor only sees records less than the lowest - * open transaction when choosing which files to compact, but that it still ignores aborted - * records when compacting. - */ -public class ValidCompactorTxnList extends ValidReadTxnList { - //TODO: refactor this - minOpenTxn is not needed if we set - // highWatermark = Math.min(highWaterMark, minOpenTxn) (assuming there are open txns) - - // The minimum open transaction id - private long minOpenTxn; - - public ValidCompactorTxnList() { - super(); - minOpenTxn = -1; - } - - /** - * - * @param exceptions list of all open and aborted transactions - * @param minOpen lowest open transaction - * @param highWatermark highest committed transaction - */ - public ValidCompactorTxnList(long[] exceptions, long minOpen, long highWatermark) { - super(exceptions, highWatermark); - minOpenTxn = minOpen; - } - - public ValidCompactorTxnList(String value) { - super(value); - } - - @Override - public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) { - if (highWatermark < minTxnId) { - return RangeResponse.NONE; - } else if (minOpenTxn < 0) { - return highWatermark >= maxTxnId ? RangeResponse.ALL : RangeResponse.NONE; - } else { - return minOpenTxn > maxTxnId ? RangeResponse.ALL : RangeResponse.NONE; - } - } - - @Override - public String writeToString() { - StringBuilder buf = new StringBuilder(); - buf.append(highWatermark); - buf.append(':'); - buf.append(minOpenTxn); - if (exceptions.length == 0) { - buf.append(':'); - } else { - for(long except: exceptions) { - buf.append(':'); - buf.append(except); - } - } - return buf.toString(); - } - - @Override - public void readFromString(String src) { - if (src == null || src.length() == 0) { - highWatermark = Long.MAX_VALUE; - exceptions = new long[0]; - } else { - String[] values = src.split(":"); - highWatermark = Long.parseLong(values[0]); - minOpenTxn = Long.parseLong(values[1]); - exceptions = new long[values.length - 2]; - for(int i = 2; i < values.length; ++i) { - exceptions[i-2] = Long.parseLong(values[i]); - } - } - } - - @VisibleForTesting - long getMinOpenTxn() { - return minOpenTxn; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/39ecc205/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java index c0923eb..c249854 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.metastore.txn; +import org.apache.hadoop.hive.common.ValidCompactorTxnList; import org.apache.hadoop.hive.common.ValidTxnList; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/hive/blob/39ecc205/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index b54a95d..6caca98 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import org.apache.hadoop.hive.common.ValidCompactorTxnList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -27,7 +28,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.StringableMap; import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -133,6 +133,10 @@ public class CompactorMR { overrideTblProps(job, t.getParameters(), ci.properties); } setColumnTypes(job, sd.getCols()); + //with feature on, multiple tasks may get into conflict creating/using TMP_LOCATION and if we were + //to generate the target dir in the Map task, there is no easy way to pass it to OutputCommitter + //to do the final move + job.setBoolean("mapreduce.map.speculative", false); return job; } @@ -623,7 +627,7 @@ public class CompactorMR { AcidInputFormat<WritableComparable, V> aif = instantiate(AcidInputFormat.class, jobConf.get(INPUT_FORMAT_CLASS_NAME)); ValidTxnList txnList = - new ValidReadTxnList(jobConf.get(ValidTxnList.VALID_TXNS_KEY)); + new ValidCompactorTxnList(jobConf.get(ValidTxnList.VALID_TXNS_KEY)); boolean isMajor = jobConf.getBoolean(IS_MAJOR, false); AcidInputFormat.RawReader<V> reader = http://git-wip-us.apache.org/repos/asf/hive/blob/39ecc205/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index 83a2ba3..5745dee 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -20,7 +20,7 @@ package org.apache.hadoop.hive.ql.io; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.metastore.txn.ValidCompactorTxnList; +import org.apache.hadoop.hive.common.ValidCompactorTxnList; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat; import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile;