http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index f7e3e3a..6f96e1d 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; import org.apache.hadoop.hive.metastore.MetaStoreEventListener; import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.RawStoreProxy; @@ -56,6 +57,7 @@ import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; import org.apache.hadoop.hive.metastore.events.DropTableEvent; import org.apache.hadoop.hive.metastore.events.InsertEvent; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.PartitionFiles; @@ -86,23 +88,17 @@ public class DbNotificationListener extends MetaStoreEventListener { // HiveConf rather than a Configuration. private HiveConf hiveConf; private MessageFactory msgFactory; - private RawStore rs; - - private synchronized void init(HiveConf conf) { - try { - rs = RawStoreProxy.getProxy(conf, conf, - conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), 999999); - } catch (MetaException e) { - LOG.error("Unable to connect to raw store, notifications will not be tracked", e); - rs = null; - } - if (cleaner == null && rs != null) { - cleaner = new CleanerThread(conf, rs); + + private synchronized void init(HiveConf conf) throws MetaException { + if (cleaner == null) { + cleaner = + new CleanerThread(conf, RawStoreProxy.getProxy(conf, conf, + conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), 999999)); cleaner.start(); } } - public DbNotificationListener(Configuration config) { + public DbNotificationListener(Configuration config) throws MetaException { super(config); // The code in MetastoreUtils.getMetaStoreListeners() that calls this looks for a constructor // with a Configuration parameter, so we have to declare config as Configuration. But it @@ -142,7 +138,7 @@ public class DbNotificationListener extends MetaStoreEventListener { .buildCreateTableMessage(t, new FileIterator(t.getSd().getLocation())).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); - process(event); + process(event, tableEvent); } /** @@ -157,7 +153,7 @@ public class DbNotificationListener extends MetaStoreEventListener { .buildDropTableMessage(t).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); - process(event); + process(event, tableEvent); } /** @@ -170,10 +166,10 @@ public class DbNotificationListener extends MetaStoreEventListener { Table after = tableEvent.getNewTable(); NotificationEvent event = new NotificationEvent(0, now(), EventType.ALTER_TABLE.toString(), msgFactory - .buildAlterTableMessage(before, after).toString()); + .buildAlterTableMessage(before, after, tableEvent.getIsTruncateOp()).toString()); event.setDbName(after.getDbName()); event.setTableName(after.getTableName()); - process(event); + process(event, tableEvent); } class FileIterator implements Iterator<String> { @@ -281,7 +277,7 @@ public class DbNotificationListener extends MetaStoreEventListener { new NotificationEvent(0, now(), EventType.ADD_PARTITION.toString(), msg); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); - process(event); + process(event, partitionEvent); } /** @@ -296,7 +292,7 @@ public class DbNotificationListener extends MetaStoreEventListener { .buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString()); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); - process(event); + process(event, partitionEvent); } /** @@ -309,10 +305,10 @@ public class DbNotificationListener extends MetaStoreEventListener { Partition after = partitionEvent.getNewPartition(); NotificationEvent event = new NotificationEvent(0, now(), EventType.ALTER_PARTITION.toString(), msgFactory - .buildAlterPartitionMessage(partitionEvent.getTable(), before, after).toString()); + .buildAlterPartitionMessage(partitionEvent.getTable(), before, after, partitionEvent.getIsTruncateOp()).toString()); event.setDbName(before.getDbName()); event.setTableName(before.getTableName()); - process(event); + process(event, partitionEvent); } /** @@ -326,7 +322,7 @@ public class DbNotificationListener extends MetaStoreEventListener { new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(), msgFactory .buildCreateDatabaseMessage(db).toString()); event.setDbName(db.getName()); - process(event); + process(event, dbEvent); } /** @@ -340,7 +336,7 @@ public class DbNotificationListener extends MetaStoreEventListener { new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(), msgFactory .buildDropDatabaseMessage(db).toString()); event.setDbName(db.getName()); - process(event); + process(event, dbEvent); } /** @@ -354,7 +350,7 @@ public class DbNotificationListener extends MetaStoreEventListener { new NotificationEvent(0, now(), EventType.CREATE_FUNCTION.toString(), msgFactory .buildCreateFunctionMessage(fn).toString()); event.setDbName(fn.getDbName()); - process(event); + process(event, fnEvent); } /** @@ -368,7 +364,7 @@ public class DbNotificationListener extends MetaStoreEventListener { new NotificationEvent(0, now(), EventType.DROP_FUNCTION.toString(), msgFactory .buildDropFunctionMessage(fn).toString()); event.setDbName(fn.getDbName()); - process(event); + process(event, fnEvent); } /** @@ -382,7 +378,7 @@ public class DbNotificationListener extends MetaStoreEventListener { new NotificationEvent(0, now(), EventType.CREATE_INDEX.toString(), msgFactory .buildCreateIndexMessage(index).toString()); event.setDbName(index.getDbName()); - process(event); + process(event, indexEvent); } /** @@ -396,7 +392,7 @@ public class DbNotificationListener extends MetaStoreEventListener { new NotificationEvent(0, now(), EventType.DROP_INDEX.toString(), msgFactory .buildDropIndexMessage(index).toString()); event.setDbName(index.getDbName()); - process(event); + process(event, indexEvent); } /** @@ -411,7 +407,7 @@ public class DbNotificationListener extends MetaStoreEventListener { new NotificationEvent(0, now(), EventType.ALTER_INDEX.toString(), msgFactory .buildAlterIndexMessage(before, after).toString()); event.setDbName(before.getDbName()); - process(event); + process(event, indexEvent); } class FileChksumIterator implements Iterator<String> { @@ -443,12 +439,12 @@ public class DbNotificationListener extends MetaStoreEventListener { public void onInsert(InsertEvent insertEvent) throws MetaException { NotificationEvent event = new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage( - insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(), + insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(), insertEvent.isReplace(), new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums())) .toString()); event.setDbName(insertEvent.getDb()); event.setTableName(insertEvent.getTable()); - process(event); + process(event, insertEvent); } /** @@ -472,18 +468,27 @@ public class DbNotificationListener extends MetaStoreEventListener { return (int)millis; } - // Process this notification by adding it to metastore DB - private void process(NotificationEvent event) { + /** + * Process this notification by adding it to metastore DB. + * + * @param event NotificationEvent is the object written to the metastore DB. + * @param listenerEvent ListenerEvent (from which NotificationEvent was based) used only to set the + * DB_NOTIFICATION_EVENT_ID_KEY_NAME for future reference by other listeners. + */ + private void process(NotificationEvent event, ListenerEvent listenerEvent) throws MetaException { event.setMessageFormat(msgFactory.getMessageFormat()); - if (rs != null) { - synchronized (NOTIFICATION_TBL_LOCK) { - LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(), - event.getMessage()); - rs.addNotificationEvent(event); - } - } else { - LOG.warn("Dropping event " + event + " since notification is not running."); + synchronized (NOTIFICATION_TBL_LOCK) { + LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(), + event.getMessage()); + HMSHandler.getMSForConf(hiveConf).addNotificationEvent(event); } + + // Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners. + if (event.isSetEventId()) { + listenerEvent.putParameter( + MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME, + Long.toString(event.getEventId())); + } } private static class CleanerThread extends Thread {
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java new file mode 100644 index 0000000..a4f2d59 --- /dev/null +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.hcatalog.listener; + +/** + * Keeps a list of reserved keys used by Hive listeners when updating the ListenerEvent + * parameters. + */ +public class MetaStoreEventListenerConstants { + /* + * DbNotificationListener keys reserved for updating ListenerEvent parameters. + * + * DB_NOTIFICATION_EVENT_ID_KEY_NAME This key will have the event identifier that DbNotificationListener + * processed during an event. This event identifier might be shared + * across other MetaStoreEventListener implementations. + */ + public static final String DB_NOTIFICATION_EVENT_ID_KEY_NAME = "DB_NOTIFICATION_EVENT_ID_KEY_NAME"; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/streaming/pom.xml ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/pom.xml b/hcatalog/streaming/pom.xml index e765305..5bea0a6 100644 --- a/hcatalog/streaming/pom.xml +++ b/hcatalog/streaming/pom.xml @@ -20,7 +20,7 @@ <parent> <groupId>org.apache.hive.hcatalog</groupId> <artifactId>hive-hcatalog</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>3.0.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java new file mode 100644 index 0000000..78987ab --- /dev/null +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.hcatalog.streaming; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.RegexSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.Text; + +/** + * Streaming Writer handles text input data with regex. Uses + * org.apache.hadoop.hive.serde2.RegexSerDe + */ +public class StrictRegexWriter extends AbstractRecordWriter { + private RegexSerDe serde; + private final StructObjectInspector recordObjInspector; + private final ObjectInspector[] bucketObjInspectors; + private final StructField[] bucketStructFields; + + /** + * @param endPoint the end point to write to + * @param conn connection this Writer is to be used with + * @throws ConnectionError + * @throws SerializationError + * @throws StreamingException + */ + public StrictRegexWriter(HiveEndPoint endPoint, StreamingConnection conn) + throws ConnectionError, SerializationError, StreamingException { + this(null, endPoint, null, conn); + } + + /** + * @param endPoint the end point to write to + * @param conf a Hive conf object. Should be null if not using advanced Hive settings. + * @param conn connection this Writer is to be used with + * @throws ConnectionError + * @throws SerializationError + * @throws StreamingException + */ + public StrictRegexWriter(HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn) + throws ConnectionError, SerializationError, StreamingException { + this(null, endPoint, conf, conn); + } + + /** + * @param regex to parse the data + * @param endPoint the end point to write to + * @param conf a Hive conf object. Should be null if not using advanced Hive settings. + * @param conn connection this Writer is to be used with + * @throws ConnectionError + * @throws SerializationError + * @throws StreamingException + */ + public StrictRegexWriter(String regex, HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn) + throws ConnectionError, SerializationError, StreamingException { + super(endPoint, conf, conn); + this.serde = createSerde(tbl, conf, regex); + // get ObjInspectors for entire record and bucketed cols + try { + recordObjInspector = ( StructObjectInspector ) serde.getObjectInspector(); + this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector); + } catch (SerDeException e) { + throw new SerializationError("Unable to get ObjectInspector for bucket columns", e); + } + + // get StructFields for bucketed cols + bucketStructFields = new StructField[bucketIds.size()]; + List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs(); + for (int i = 0; i < bucketIds.size(); i++) { + bucketStructFields[i] = allFields.get(bucketIds.get(i)); + } + } + + @Override + public AbstractSerDe getSerde() { + return serde; + } + + @Override + protected StructObjectInspector getRecordObjectInspector() { + return recordObjInspector; + } + + @Override + protected StructField[] getBucketStructFields() { + return bucketStructFields; + } + + @Override + protected ObjectInspector[] getBucketObjectInspectors() { + return bucketObjInspectors; + } + + + @Override + public void write(long transactionId, byte[] record) + throws StreamingIOFailure, SerializationError { + try { + Object encodedRow = encode(record); + int bucket = getBucket(encodedRow); + getRecordUpdater(bucket).insert(transactionId, encodedRow); + } catch (IOException e) { + throw new StreamingIOFailure("Error writing record in transaction(" + + transactionId + ")", e); + } + } + + /** + * Creates RegexSerDe + * @param tbl used to create serde + * @param conf used to create serde + * @param regex used to create serde + * @return + * @throws SerializationError if serde could not be initialized + */ + private static RegexSerDe createSerde(Table tbl, HiveConf conf, String regex) + throws SerializationError { + try { + Properties tableProps = MetaStoreUtils.getTableMetadata(tbl); + tableProps.setProperty(RegexSerDe.INPUT_REGEX, regex); + ArrayList<String> tableColumns = getCols(tbl); + tableProps.setProperty(serdeConstants.LIST_COLUMNS, StringUtils.join(tableColumns, ",")); + RegexSerDe serde = new RegexSerDe(); + SerDeUtils.initializeSerDe(serde, conf, tableProps, null); + return serde; + } catch (SerDeException e) { + throw new SerializationError("Error initializing serde " + RegexSerDe.class.getName(), e); + } + } + + private static ArrayList<String> getCols(Table table) { + List<FieldSchema> cols = table.getSd().getCols(); + ArrayList<String> colNames = new ArrayList<String>(cols.size()); + for (FieldSchema col : cols) { + colNames.add(col.getName().toLowerCase()); + } + return colNames; + } + + /** + * Encode Utf8 encoded string bytes using RegexSerDe + * + * @param utf8StrRecord + * @return The encoded object + * @throws SerializationError + */ + @Override + public Object encode(byte[] utf8StrRecord) throws SerializationError { + try { + Text blob = new Text(utf8StrRecord); + return serde.deserialize(blob); + } catch (SerDeException e) { + throw new SerializationError("Unable to convert byte[] record into Object", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index bf29993..097de9b 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -64,10 +64,6 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.IOConstants; -import org.apache.hadoop.hive.shims.Utils; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.orc.impl.OrcAcidUtils; -import org.apache.orc.tools.FileDump; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcStruct; @@ -82,11 +78,15 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector; +import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.orc.impl.OrcAcidUtils; +import org.apache.orc.tools.FileDump; import org.apache.thrift.TException; import org.junit.After; import org.junit.Assert; @@ -485,9 +485,9 @@ public class TestStreaming { NullWritable key = rr.createKey(); OrcStruct value = rr.createValue(); - for (int i = 0; i < records.length; i++) { + for (String record : records) { Assert.assertEquals(true, rr.next(key, value)); - Assert.assertEquals(records[i], value.toString()); + Assert.assertEquals(record, value.toString()); } Assert.assertEquals(false, rr.next(key, value)); } @@ -741,7 +741,7 @@ public class TestStreaming { txnBatch.write("1,Hello streaming".getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); + checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}"); Assert.assertEquals(TransactionBatch.TxnState.COMMITTED , txnBatch.getCurrentTransactionState()); @@ -753,11 +753,11 @@ public class TestStreaming { txnBatch.write("2,Welcome to streaming".getBytes()); // data should not be visible - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); + checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}"); txnBatch.commit(); - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}", + checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}"); txnBatch.close(); @@ -787,6 +787,75 @@ public class TestStreaming { } @Test + public void testTransactionBatchCommit_Regex() throws Exception { + testTransactionBatchCommit_Regex(null); + } + @Test + public void testTransactionBatchCommit_RegexUGI() throws Exception { + testTransactionBatchCommit_Regex(Utils.getUGI()); + } + private void testTransactionBatchCommit_Regex(UserGroupInformation ugi) throws Exception { + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, + partitionVals); + StreamingConnection connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName()); + String regex = "([^,]*),(.*)"; + StrictRegexWriter writer = new StrictRegexWriter(regex, endPt, conf, connection); + + // 1st Txn + TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); + txnBatch.beginNextTransaction(); + Assert.assertEquals(TransactionBatch.TxnState.OPEN + , txnBatch.getCurrentTransactionState()); + txnBatch.write("1,Hello streaming".getBytes()); + txnBatch.commit(); + + checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}"); + + Assert.assertEquals(TransactionBatch.TxnState.COMMITTED + , txnBatch.getCurrentTransactionState()); + + // 2nd Txn + txnBatch.beginNextTransaction(); + Assert.assertEquals(TransactionBatch.TxnState.OPEN + , txnBatch.getCurrentTransactionState()); + txnBatch.write("2,Welcome to streaming".getBytes()); + + // data should not be visible + checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}"); + + txnBatch.commit(); + + checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}", + "{2, Welcome to streaming}"); + + txnBatch.close(); + Assert.assertEquals(TransactionBatch.TxnState.INACTIVE + , txnBatch.getCurrentTransactionState()); + + + connection.close(); + + + // To Unpartitioned table + endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); + connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName()); + regex = "([^:]*):(.*)"; + writer = new StrictRegexWriter(regex, endPt, conf, connection); + + // 1st Txn + txnBatch = connection.fetchTransactionBatch(10, writer); + txnBatch.beginNextTransaction(); + Assert.assertEquals(TransactionBatch.TxnState.OPEN + , txnBatch.getCurrentTransactionState()); + txnBatch.write("1:Hello streaming".getBytes()); + txnBatch.commit(); + + Assert.assertEquals(TransactionBatch.TxnState.COMMITTED + , txnBatch.getCurrentTransactionState()); + connection.close(); + } + + @Test public void testTransactionBatchCommit_Json() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); @@ -802,7 +871,7 @@ public class TestStreaming { txnBatch.write(rec1.getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); + checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}"); Assert.assertEquals(TransactionBatch.TxnState.COMMITTED , txnBatch.getCurrentTransactionState()); @@ -929,7 +998,7 @@ public class TestStreaming { txnBatch.write("2,Welcome to streaming".getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}", + checkDataWritten(partLoc, 14, 23, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}"); txnBatch.close(); @@ -948,13 +1017,13 @@ public class TestStreaming { txnBatch.write("1,Hello streaming".getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); + checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}"); txnBatch.beginNextTransaction(); txnBatch.write("2,Welcome to streaming".getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}", + checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}"); txnBatch.close(); @@ -965,14 +1034,14 @@ public class TestStreaming { txnBatch.write("3,Hello streaming - once again".getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", + checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}", "{2, Welcome to streaming}", "{3, Hello streaming - once again}"); txnBatch.beginNextTransaction(); txnBatch.write("4,Welcome to streaming - once again".getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", + checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}", "{2, Welcome to streaming}", "{3, Hello streaming - once again}", "{4, Welcome to streaming - once again}"); @@ -1009,11 +1078,11 @@ public class TestStreaming { txnBatch2.commit(); - checkDataWritten(partLoc, 11, 20, 1, 1, "{3, Hello streaming - once again}"); + checkDataWritten(partLoc, 24, 33, 1, 1, "{3, Hello streaming - once again}"); txnBatch1.commit(); - checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}"); + checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}"); txnBatch1.beginNextTransaction(); txnBatch1.write("2,Welcome to streaming".getBytes()); @@ -1021,17 +1090,17 @@ public class TestStreaming { txnBatch2.beginNextTransaction(); txnBatch2.write("4,Welcome to streaming - once again".getBytes()); - checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}"); + checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}"); txnBatch1.commit(); - checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", + checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{2, Welcome to streaming}", "{3, Hello streaming - once again}"); txnBatch2.commit(); - checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", + checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{2, Welcome to streaming}", "{3, Hello streaming - once again}", "{4, Welcome to streaming - once again}"); @@ -1700,7 +1769,7 @@ public class TestStreaming { txnBatch.heartbeat();//this is no-op on closed batch txnBatch.abort();//ditto GetOpenTxnsInfoResponse r = msClient.showTxns(); - Assert.assertEquals("HWM didn't match", 2, r.getTxn_high_water_mark()); + Assert.assertEquals("HWM didn't match", 17, r.getTxn_high_water_mark()); List<TxnInfo> ti = r.getOpen_txns(); Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState()); Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState()); @@ -1764,7 +1833,7 @@ public class TestStreaming { expectedEx != null && expectedEx.getMessage().contains("has been closed()")); r = msClient.showTxns(); - Assert.assertEquals("HWM didn't match", 4, r.getTxn_high_water_mark()); + Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark()); ti = r.getOpen_txns(); Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState()); Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState()); @@ -1787,7 +1856,7 @@ public class TestStreaming { expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred")); r = msClient.showTxns(); - Assert.assertEquals("HWM didn't match", 6, r.getTxn_high_water_mark()); + Assert.assertEquals("HWM didn't match", 21, r.getTxn_high_water_mark()); ti = r.getOpen_txns(); Assert.assertEquals("wrong status ti(3)", TxnState.ABORTED, ti.get(3).getState()); Assert.assertEquals("wrong status ti(4)", TxnState.ABORTED, ti.get(4).getState()); http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/java-client/pom.xml ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/java-client/pom.xml b/hcatalog/webhcat/java-client/pom.xml index 3b53664..3bb9f4d 100644 --- a/hcatalog/webhcat/java-client/pom.xml +++ b/hcatalog/webhcat/java-client/pom.xml @@ -25,7 +25,7 @@ <parent> <groupId>org.apache.hive.hcatalog</groupId> <artifactId>hive-hcatalog</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>3.0.0-SNAPSHOT</version> <relativePath>../../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java index b9cb067..86d3acb 100644 --- a/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java +++ b/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java @@ -434,7 +434,7 @@ public class TestHCatClient { HCatClient client = HCatClient.create(new Configuration(hcatConf)); boolean isExceptionCaught = false; // Table creation with a long table name causes ConnectionFailureException - final String tableName = "Temptable" + new BigInteger(200, new Random()).toString(2); + final String tableName = "Temptable" + new BigInteger(260, new Random()).toString(2); ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>(); cols.add(new HCatFieldSchema("id", Type.INT, "id columns")); http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/pom.xml ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/pom.xml b/hcatalog/webhcat/svr/pom.xml index c5ad387..a55ffe9 100644 --- a/hcatalog/webhcat/svr/pom.xml +++ b/hcatalog/webhcat/svr/pom.xml @@ -25,7 +25,7 @@ <parent> <groupId>org.apache.hive.hcatalog</groupId> <artifactId>hive-hcatalog</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>3.0.0-SNAPSHOT</version> <relativePath>../../pom.xml</relativePath> </parent> @@ -45,9 +45,50 @@ <artifactId>hive-hcatalog-core</artifactId> <version>${project.version}</version> <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-runner</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-sslengine</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-api-2.1</artifactId> + </exclusion> + </exclusions> </dependency> <!-- inter-project --> <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-rewrite</artifactId> + <version>${jetty.version}</version> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + <version>${jetty.version}</version> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlet</artifactId> + <version>${jetty.version}</version> + </dependency> + <dependency> <groupId>com.sun.jersey</groupId> <artifactId>jersey-core</artifactId> <version>${jersey.version}</version> @@ -93,11 +134,6 @@ <version>${jackson.version}</version> </dependency> <dependency> - <groupId>org.eclipse.jetty.aggregate</groupId> - <artifactId>jetty-all-server</artifactId> - <version>${jetty.version}</version> - </dependency> - <dependency> <groupId>org.slf4j</groupId> <artifactId>jul-to-slf4j</artifactId> <version>${slf4j.version}</version> @@ -107,7 +143,7 @@ <artifactId>hadoop-auth</artifactId> <version>${hadoop.version}</version> <exclusions> - <exclusion> + <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> @@ -121,16 +157,42 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + </exclusions> </dependency> <!-- test inter-project --> <dependency> http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java index 54d0907..0ea7d88 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java @@ -111,6 +111,43 @@ public class AppConfig extends Configuration { public static final String MR_AM_MEMORY_MB = "templeton.mr.am.memory.mb"; public static final String TEMPLETON_JOBSLIST_ORDER = "templeton.jobs.listorder"; + /* + * These parameters controls the maximum number of concurrent job submit/status/list + * operations in templeton service. If more number of concurrent requests comes then + * they will be rejected with BusyException. + */ + public static final String JOB_SUBMIT_MAX_THREADS = "templeton.parallellism.job.submit"; + public static final String JOB_STATUS_MAX_THREADS = "templeton.parallellism.job.status"; + public static final String JOB_LIST_MAX_THREADS = "templeton.parallellism.job.list"; + + /* + * These parameters controls the maximum time job submit/status/list operation is + * executed in templeton service. On time out, the execution is interrupted and + * TimeoutException is returned to client. On time out + * For list and status operation, there is no action needed as they are read requests. + * For submit operation, we do best effort to kill the job if its generated. Enabling + * this parameter may have following side effects + * 1) There is a possibility for having active job for some time when the client gets + * response for submit operation and a list operation from client could potential + * show the newly created job which may eventually be killed with no guarantees. + * 2) If submit operation retried by client then there is a possibility of duplicate + * jobs triggered. + * + * Time out configs should be configured in seconds. + * + */ + public static final String JOB_SUBMIT_TIMEOUT = "templeton.job.submit.timeout"; + public static final String JOB_STATUS_TIMEOUT = "templeton.job.status.timeout"; + public static final String JOB_LIST_TIMEOUT = "templeton.job.list.timeout"; + + /* + * If task execution time out is configured for submit operation then job may need to + * be killed on execution time out. These parameters controls the maximum number of + * retries and retry wait time in seconds for executing the time out task. + */ + public static final String JOB_TIMEOUT_TASK_RETRY_COUNT = "templeton.job.timeout.task.retry.count"; + public static final String JOB_TIMEOUT_TASK_RETRY_INTERVAL = "templeton.job.timeout.task.retry.interval"; + /** * see webhcat-default.xml */ http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java index 4b2dfec..622f92d 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java @@ -24,6 +24,7 @@ import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.security.UserGroupInformation; @@ -41,10 +42,11 @@ public class DeleteDelegator extends TempletonDelegator { public QueueStatusBean run(String user, String id) throws NotAuthorizedException, BadParam, IOException, InterruptedException { - UserGroupInformation ugi = UgiFactory.getUgi(user); + UserGroupInformation ugi = null; WebHCatJTShim tracker = null; JobState state = null; try { + ugi = UgiFactory.getUgi(user); tracker = ShimLoader.getHadoopShims().getWebHCatShim(appConf, ugi); JobID jobid = StatusDelegator.StringToJobID(id); if (jobid == null) @@ -69,6 +71,8 @@ public class DeleteDelegator extends TempletonDelegator { tracker.close(); if (state != null) state.close(); + if (ugi != null) + FileSystem.closeAllForUGI(ugi); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java index f0296cb..1953028 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java @@ -49,7 +49,7 @@ public class HiveDelegator extends LauncherDelegator { String statusdir, String callback, String completedUrl, boolean enablelog, Boolean enableJobReconnect) throws NotAuthorizedException, BadParam, BusyException, QueueException, - ExecuteException, IOException, InterruptedException + ExecuteException, IOException, InterruptedException, TooManyRequestsException { runAs = user; List<String> args = makeArgs(execute, srcFile, defines, hiveArgs, otherFiles, statusdir, http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java index 84cd5b9..1246b40 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java @@ -46,7 +46,7 @@ public class JarDelegator extends LauncherDelegator { boolean usesHcatalog, String completedUrl, boolean enablelog, Boolean enableJobReconnect, JobType jobType) throws NotAuthorizedException, BadParam, BusyException, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, TooManyRequestsException { runAs = user; List<String> args = makeArgs(jar, mainClass, libjars, files, jarArgs, defines, http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java new file mode 100644 index 0000000..e703eff --- /dev/null +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import java.util.concurrent.Callable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class JobCallable<T> implements Callable<T> { + private static final Logger LOG = LoggerFactory.getLogger(JobCallable.class); + + static public enum JobState { + STARTED, + FAILED, + COMPLETED + } + + /* + * Job state of job request. Changes to the state are synchronized using + * setStateAndResult. This is required due to two different threads, + * main thread and job execute thread, tries to change state and organize + * clean up tasks. + */ + private JobState jobState = JobState.STARTED; + + /* + * Result of JobCallable task after successful task completion. This is + * expected to be set by the thread which executes JobCallable task. + */ + public T returnResult = null; + + /* + * Sets the job state to FAILED. Returns true if FAILED status is set. + * Otherwise, it returns false. + */ + public boolean setJobStateFailed() { + return setStateAndResult(JobState.FAILED, null); + } + + /* + * Sets the job state to COMPLETED and also sets the results value. Returns true + * if COMPLETED status is set. Otherwise, it returns false. + */ + public boolean setJobStateCompleted(T result) { + return setStateAndResult(JobState.COMPLETED, result); + } + + /* + * Sets the job state and result. Returns true if status and result are set. + * Otherwise, it returns false. + */ + private synchronized boolean setStateAndResult(JobState jobState, T result) { + if (this.jobState == JobState.STARTED) { + this.jobState = jobState; + this.returnResult = result; + return true; + } else { + LOG.info("Failed to set job state to " + jobState + " due to job state " + + this.jobState + ". Expected state is " + JobState.STARTED); + } + + return false; + } + + /* + * Executes the callable task with help of execute() call and gets the result + * of the task. It also sets job status as COMPLETED if state is not already + * set to FAILED and returns result to future. + */ + public T call() throws Exception { + + /* + * Don't catch any execution exceptions here and let the caller catch it. + */ + T result = this.execute(); + + if (!this.setJobStateCompleted(result)) { + /* + * Failed to set job status as COMPLETED which mean the main thread would have + * exited and not waiting for the result. Call cleanup() to execute any cleanup. + */ + cleanup(); + return null; + } + + return this.returnResult; + } + + /* + * Abstract method to be overridden for task execution. + */ + public abstract T execute() throws Exception; + + /* + * Cleanup method called to run cleanup tasks if job state is FAILED. By default, + * no cleanup is provided. + */ + public void cleanup() {} +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java new file mode 100644 index 0000000..9ac4588 --- /dev/null +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java @@ -0,0 +1,341 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.Future; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JobRequestExecutor<T> { + private static final Logger LOG = LoggerFactory.getLogger(JobRequestExecutor.class); + private static AppConfig appConf = Main.getAppConfigInstance(); + + /* + * Thread pool to execute job requests. + */ + private ThreadPoolExecutor jobExecutePool = null; + + /* + * Type of job request. + */ + private JobRequestType requestType; + + /* + * Config name used to find the number of concurrent requests. + */ + private String concurrentRequestsConfigName; + + /* + * Config name used to find the maximum time job request can be executed. + */ + private String jobTimeoutConfigName; + + /* + * Job request execution time out in seconds. If it is 0 then request + * will not be timed out. + */ + private int requestExecutionTimeoutInSec = 0; + + /* + * Amount of time a thread can be alive in thread pool before cleaning this up. Core threads + * will not be cleanup from thread pool. + */ + private int threadKeepAliveTimeInHours = 1; + + /* + * Maximum number of times a cancel request is sent to job request execution + * thread. Future.cancel may not be able to interrupt the thread if it is + * blocked on network calls. + */ + private int maxTaskCancelRetryCount = 10; + + /* + * Wait time in milliseconds before another cancel request is made. + */ + private int maxTaskCancelRetryWaitTimeInMs = 1000; + + /* + * A flag to indicate whether to cancel the task when exception TimeoutException or + * InterruptedException or CancellationException raised. The default is cancel thread. + */ + private boolean enableCancelTask = true; + + /* + * Job Request type. + */ + public enum JobRequestType { + Submit, + Status, + List + } + + /* + * Creates a job request object and sets up execution environment. Creates a thread pool + * to execute job requests. + * + * @param requestType + * Job request type + * + * @param concurrentRequestsConfigName + * Config name to be used to extract number of concurrent requests to be serviced. + * + * @param jobTimeoutConfigName + * Config name to be used to extract maximum time a task can execute a request. + * + * @param enableCancelTask + * A flag to indicate whether to cancel the task when exception TimeoutException + * or InterruptedException or CancellationException raised. + * + */ + public JobRequestExecutor(JobRequestType requestType, String concurrentRequestsConfigName, + String jobTimeoutConfigName, boolean enableCancelTask) { + + this.concurrentRequestsConfigName = concurrentRequestsConfigName; + this.jobTimeoutConfigName = jobTimeoutConfigName; + this.requestType = requestType; + this.enableCancelTask = enableCancelTask; + + /* + * The default number of threads will be 0. That means thread pool is not used and + * operation is executed with the current thread. + */ + int threads = !StringUtils.isEmpty(concurrentRequestsConfigName) ? + appConf.getInt(concurrentRequestsConfigName, 0) : 0; + + if (threads > 0) { + /* + * Create a thread pool with no queue wait time to execute the operation. This will ensure + * that job requests are rejected if there are already maximum number of threads busy. + */ + this.jobExecutePool = new ThreadPoolExecutor(threads, threads, + threadKeepAliveTimeInHours, TimeUnit.HOURS, + new SynchronousQueue<Runnable>()); + this.jobExecutePool.allowCoreThreadTimeOut(true); + + /* + * Get the job request time out value. If this configuration value is set to 0 + * then job request will wait until it finishes. + */ + if (!StringUtils.isEmpty(jobTimeoutConfigName)) { + this.requestExecutionTimeoutInSec = appConf.getInt(jobTimeoutConfigName, 0); + } + + LOG.info("Configured " + threads + " threads for job request type " + this.requestType + + " with time out " + this.requestExecutionTimeoutInSec + " s."); + } else { + /* + * If threads are not configured then they will be executed in current thread itself. + */ + LOG.info("No thread pool configured for job request type " + this.requestType); + } + } + + /* + * Creates a job request object and sets up execution environment. Creates a thread pool + * to execute job requests. + * + * @param requestType + * Job request type + * + * @param concurrentRequestsConfigName + * Config name to be used to extract number of concurrent requests to be serviced. + * + * @param jobTimeoutConfigName + * Config name to be used to extract maximum time a task can execute a request. + * + */ + public JobRequestExecutor(JobRequestType requestType, String concurrentRequestsConfigName, + String jobTimeoutConfigName) { + this(requestType, concurrentRequestsConfigName, jobTimeoutConfigName, true); + } + + /* + * Returns true of thread pool is created and can be used for executing a job request. + * Otherwise, returns false. + */ + public boolean isThreadPoolEnabled() { + return this.jobExecutePool != null; + } + + /* + * Executes job request operation. If thread pool is not created then job request is + * executed in current thread itself. + * + * @param jobExecuteCallable + * Callable object to run the job request task. + * + */ + public T execute(JobCallable<T> jobExecuteCallable) throws InterruptedException, + TimeoutException, TooManyRequestsException, ExecutionException { + /* + * The callable shouldn't be null to execute. The thread pool also should be configured + * to execute requests. + */ + assert (jobExecuteCallable != null); + assert (this.jobExecutePool != null); + + String type = this.requestType.toString().toLowerCase(); + + String retryMessageForConcurrentRequests = "Please wait for some time before retrying " + + "the operation. Please refer to the config " + concurrentRequestsConfigName + + " to configure concurrent requests."; + + LOG.debug("Starting new " + type + " job request with time out " + this.requestExecutionTimeoutInSec + + "seconds."); + Future<T> future = null; + + try { + future = this.jobExecutePool.submit(jobExecuteCallable); + } catch (RejectedExecutionException rejectedException) { + /* + * Not able to find thread to execute the job request. Raise Busy exception and client + * can retry the operation. + */ + String tooManyRequestsExceptionMessage = "Unable to service the " + type + " job request as " + + "templeton service is busy with too many " + type + " job requests. " + + retryMessageForConcurrentRequests; + + LOG.warn(tooManyRequestsExceptionMessage); + throw new TooManyRequestsException(tooManyRequestsExceptionMessage); + } + + T result = null; + + try { + result = this.requestExecutionTimeoutInSec > 0 + ? future.get(this.requestExecutionTimeoutInSec, TimeUnit.SECONDS) : future.get(); + } catch (TimeoutException e) { + /* + * See if the execution thread has just completed operation and result is available. + * If result is available then return the result. Otherwise, raise exception. + */ + if ((result = tryGetJobResultOrSetJobStateFailed(jobExecuteCallable)) == null) { + String message = this.requestType + " job request got timed out. Please wait for some time " + + "before retrying the operation. Please refer to the config " + + jobTimeoutConfigName + " to configure job request time out."; + LOG.warn(message); + + /* + * Throw TimeoutException to caller. + */ + throw new TimeoutException(message); + } + } catch (InterruptedException e) { + /* + * See if the execution thread has just completed operation and result is available. + * If result is available then return the result. Otherwise, raise exception. + */ + if ((result = tryGetJobResultOrSetJobStateFailed(jobExecuteCallable)) == null) { + String message = this.requestType + " job request got interrupted. Please wait for some time " + + "before retrying the operation."; + LOG.warn(message); + + /* + * Throw TimeoutException to caller. + */ + throw new InterruptedException(message); + } + } catch (CancellationException e) { + /* + * See if the execution thread has just completed operation and result is available. + * If result is available then return the result. Otherwise, raise exception. + */ + if ((result = tryGetJobResultOrSetJobStateFailed(jobExecuteCallable)) == null) { + String message = this.requestType + " job request got cancelled and thread got interrupted. " + + "Please wait for some time before retrying the operation."; + LOG.warn(message); + + throw new InterruptedException(message); + } + } finally { + /* + * If the thread is still active and needs to be cancelled then cancel it. This may + * happen in case task got interrupted, or timed out. + */ + if (enableCancelTask) { + cancelExecutePoolThread(future); + } + } + + LOG.debug("Completed " + type + " job request."); + + return result; + } + + /* + * Initiate cancel request to cancel the thread execution and interrupt the thread. + * If thread interruption is not handled by jobExecuteCallable then thread may continue + * running to completion. The cancel call may fail for some scenarios. In that case, + * retry the cancel call until it returns true or max retry count is reached. + * + * @param future + * Future object which has handle to cancel the thread. + * + */ + private void cancelExecutePoolThread(Future<T> future) { + int retryCount = 0; + while(retryCount < this.maxTaskCancelRetryCount && !future.isDone()) { + LOG.info("Task is still executing the job request. Cancelling it with retry count: " + + retryCount); + if (future.cancel(true)) { + /* + * Cancelled the job request and return to client. + */ + LOG.info("Cancel job request issued successfully."); + return; + } + + retryCount++; + try { + Thread.sleep(this.maxTaskCancelRetryWaitTimeInMs); + } catch (InterruptedException e) { + /* + * Nothing to do. Just retry. + */ + } + } + + LOG.warn("Failed to cancel the job. isCancelled: " + future.isCancelled() + + " Retry count: " + retryCount); + } + + /* + * Tries to get the job result if job request is completed. Otherwise it sets job status + * to FAILED such that execute thread can do necessary clean up based on FAILED state. + */ + private T tryGetJobResultOrSetJobStateFailed(JobCallable<T> jobExecuteCallable) { + if (!jobExecuteCallable.setJobStateFailed()) { + LOG.info("Job is already COMPLETED. Returning the result."); + return jobExecuteCallable.returnResult; + } else { + LOG.info("Job status set to FAILED. Job clean up to be done by execute thread " + + "after job request is executed."); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java index b3f44a2..9bea897 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java @@ -23,16 +23,19 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.shims.HadoopShimsSecure; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.util.ToolRunner; import org.apache.hive.hcatalog.templeton.tool.JobState; import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob; @@ -50,9 +53,26 @@ public class LauncherDelegator extends TempletonDelegator { static public enum JobType {JAR, STREAMING, PIG, HIVE, SQOOP} private boolean secureMeatastoreAccess = false; private final String HIVE_SHIMS_FILENAME_PATTERN = ".*hive-shims.*"; + private final String JOB_SUBMIT_EXECUTE_THREAD_PREFIX = "JobSubmitExecute"; + private final int jobTimeoutTaskRetryCount; + private final int jobTimeoutTaskRetryIntervalInSec; + + /** + * Current thread used to set in execution threads. + */ + private final String submitThreadId = Thread.currentThread().getName(); + + /** + * Job request executor to submit job requests. + */ + private static JobRequestExecutor<EnqueueBean> jobRequest = + new JobRequestExecutor<EnqueueBean>(JobRequestExecutor.JobRequestType.Submit, + AppConfig.JOB_SUBMIT_MAX_THREADS, AppConfig.JOB_SUBMIT_TIMEOUT, false); public LauncherDelegator(AppConfig appConf) { super(appConf); + jobTimeoutTaskRetryCount = appConf.getInt(AppConfig.JOB_TIMEOUT_TASK_RETRY_COUNT, 0); + jobTimeoutTaskRetryIntervalInSec = appConf.getInt(AppConfig.JOB_TIMEOUT_TASK_RETRY_INTERVAL, 0); } public void registerJob(String id, String user, String callback, @@ -70,19 +90,102 @@ public class LauncherDelegator extends TempletonDelegator { } } + /* + * Submit job request. If maximum concurrent job submit requests are configured then submit + * request will be executed on a thread from thread pool. If job submit request time out is + * configured then request execution thread will be interrupted if thread times out. Also + * does best efforts to identify if job is submitted and kill it quietly. + */ + public EnqueueBean enqueueController(final String user, final Map<String, Object> userArgs, + final String callback, final List<String> args) + throws NotAuthorizedException, BusyException, IOException, QueueException, TooManyRequestsException { + + EnqueueBean bean = null; + final TempletonControllerJob controllerJob = getTempletonController(); + + if (jobRequest.isThreadPoolEnabled()) { + JobCallable<EnqueueBean> jobExecuteCallable = getJobSubmitTask(user, userArgs, callback, + args, controllerJob); + try { + bean = jobRequest.execute(jobExecuteCallable); + } catch (TimeoutException ex) { + /* + * Job request got timed out. Job kill should have started. Return to client with + * QueueException. + */ + throw new QueueException(ex.getMessage()); + } catch (InterruptedException ex) { + /* + * Job request got interrupted. Job kill should have started. Return to client with + * with QueueException. + */ + throw new QueueException(ex.getMessage()); + } catch (ExecutionException ex) { + /* + * ExecutionException is raised if job execution gets an exception. Return to client + * with the exception. + */ + throw new QueueException(ex.getMessage()); + } + } else { + LOG.info("No thread pool configured for submit job request. Executing " + + "the job request in current thread."); + + bean = enqueueJob(user, userArgs, callback, args, controllerJob); + } + + return bean; + } + + /* + * Job callable task for job submit operation. Overrides behavior of execute() + * to submit job. Also, overrides the behavior of cleanup() to kill the job in case + * job submission request is timed out or interrupted. + */ + private JobCallable<EnqueueBean> getJobSubmitTask(final String user, + final Map<String, Object> userArgs, final String callback, + final List<String> args, final TempletonControllerJob controllerJob) { + return new JobCallable<EnqueueBean>() { + @Override + public EnqueueBean execute() throws NotAuthorizedException, BusyException, IOException, + QueueException { + /* + * Change the current thread name to include parent thread Id if it is executed + * in thread pool. Useful to extract logs specific to a job request and helpful + * to debug job issues. + */ + Thread.currentThread().setName(String.format("%s-%s-%s", JOB_SUBMIT_EXECUTE_THREAD_PREFIX, + submitThreadId, Thread.currentThread().getId())); + + return enqueueJob(user, userArgs, callback, args, controllerJob); + } + + @Override + public void cleanup() { + /* + * Failed to set job status as COMPLETED which mean the main thread would have + * exited and not waiting for the result. Kill the submitted job. + */ + LOG.info("Job kill not done by main thread. Trying to kill now."); + killTempletonJobWithRetry(user, controllerJob.getSubmittedId()); + } + }; + } + /** * Enqueue the TempletonControllerJob directly calling doAs. */ - public EnqueueBean enqueueController(String user, Map<String, Object> userArgs, String callback, - List<String> args) + public EnqueueBean enqueueJob(String user, Map<String, Object> userArgs, String callback, + List<String> args, TempletonControllerJob controllerJob) throws NotAuthorizedException, BusyException, IOException, QueueException { + UserGroupInformation ugi = null; try { - UserGroupInformation ugi = UgiFactory.getUgi(user); + ugi = UgiFactory.getUgi(user); final long startTime = System.nanoTime(); - String id = queueAsUser(ugi, args); + String id = queueAsUser(ugi, args, controllerJob); long elapsed = ((System.nanoTime() - startTime) / ((int) 1e6)); LOG.debug("queued job " + id + " in " + elapsed + " ms"); @@ -96,24 +199,91 @@ public class LauncherDelegator extends TempletonDelegator { return new EnqueueBean(id); } catch (InterruptedException e) { throw new QueueException("Unable to launch job " + e); + } finally { + if (ugi != null) { + FileSystem.closeAllForUGI(ugi); + } } } - private String queueAsUser(UserGroupInformation ugi, final List<String> args) + private String queueAsUser(UserGroupInformation ugi, final List<String> args, + final TempletonControllerJob controllerJob) throws IOException, InterruptedException { if(LOG.isDebugEnabled()) { LOG.debug("Launching job: " + args); } return ugi.doAs(new PrivilegedExceptionAction<String>() { public String run() throws Exception { - String[] array = new String[args.size()]; - TempletonControllerJob ctrl = new TempletonControllerJob(secureMeatastoreAccess, appConf); - ToolRunner.run(ctrl, args.toArray(array)); - return ctrl.getSubmittedId(); + runTempletonControllerJob(controllerJob, args); + return controllerJob.getSubmittedId(); } }); } + /* + * Kills templeton job with multiple retries if job exists. Returns true if kill job + * attempt is success. Otherwise returns false. + */ + private boolean killTempletonJobWithRetry(String user, String jobId) { + /* + * Make null safe Check if the job submission has gone through and if job is valid. + */ + if (StringUtils.startsWith(jobId, "job_")) { + LOG.info("Started killing the job " + jobId); + + boolean success = false; + int count = 0; + do { + try { + count++; + killJob(user, jobId); + success = true; + LOG.info("Kill job attempt succeeded."); + } catch (Exception e) { + LOG.info("Failed to kill the job due to exception: " + e.getMessage()); + LOG.info("Waiting for " + jobTimeoutTaskRetryIntervalInSec + "s before retrying " + + "the operation. Iteration: " + count); + try { + Thread.sleep(jobTimeoutTaskRetryIntervalInSec * 1000); + } catch (InterruptedException ex) { + LOG.info("Got interrupted while waiting for next retry."); + } + } + } while (!success && count < jobTimeoutTaskRetryCount); + + return success; + } else { + LOG.info("Couldn't find a valid job id after job request is timed out."); + return false; + } + } + + /* + * Gets new templeton controller objects. + */ + protected TempletonControllerJob getTempletonController() { + return new TempletonControllerJob(secureMeatastoreAccess, appConf); + } + + /* + * Runs the templeton controller job with 'args'. Utilizes ToolRunner to run + * the actual job. + */ + protected int runTempletonControllerJob(TempletonControllerJob controllerJob, List<String> args) + throws IOException, InterruptedException, TimeoutException, Exception { + String[] array = new String[args.size()]; + return ToolRunner.run(controllerJob, args.toArray(array)); + } + + /* + * Uses DeleteDelegator to kill a job and ignores all exceptions. + */ + protected void killJob(String user, String jobId) + throws NotAuthorizedException, BadParam, IOException, InterruptedException { + DeleteDelegator d = new DeleteDelegator(appConf); + d.run(user, jobId); + } + public List<String> makeLauncherArgs(AppConfig appConf, String statusdir, String completedUrl, List<String> copyFiles, @@ -180,24 +350,35 @@ public class LauncherDelegator extends TempletonDelegator { */ private String getShimLibjars() { WebHCatJTShim shim = null; + UserGroupInformation ugi = null; try { - shim = ShimLoader.getHadoopShims().getWebHCatShim(appConf, UserGroupInformation.getCurrentUser()); + ugi = UserGroupInformation.getCurrentUser(); + shim = ShimLoader.getHadoopShims().getWebHCatShim(appConf, ugi); + + // Besides the HiveShims jar which is Hadoop version dependent we also + // always need to include hive shims common jars. + Path shimCommonJar = new Path( + TempletonUtils.findContainingJar(ShimLoader.class, HIVE_SHIMS_FILENAME_PATTERN)); + Path shimCommonSecureJar = new Path( + TempletonUtils.findContainingJar(HadoopShimsSecure.class, HIVE_SHIMS_FILENAME_PATTERN)); + Path shimJar = new Path( + TempletonUtils.findContainingJar(shim.getClass(), HIVE_SHIMS_FILENAME_PATTERN)); + + return String.format( + "%s,%s,%s", + shimCommonJar.toString(), shimCommonSecureJar.toString(), shimJar.toString()); } catch (IOException e) { - throw new RuntimeException("Failed to get WebHCatShim", e); + throw new RuntimeException("Failed to get shimLibJars", e); + } finally { + try { + if (ugi != null) { + FileSystem.closeAllForUGI(ugi); + } + } catch (IOException e) { + throw new RuntimeException("Failed to closeAllForUGI", e); + } } - // Besides the HiveShims jar which is Hadoop version dependent we also - // always need to include hive shims common jars. - Path shimCommonJar = new Path( - TempletonUtils.findContainingJar(ShimLoader.class, HIVE_SHIMS_FILENAME_PATTERN)); - Path shimCommonSecureJar = new Path( - TempletonUtils.findContainingJar(HadoopShimsSecure.class, HIVE_SHIMS_FILENAME_PATTERN)); - Path shimJar = new Path( - TempletonUtils.findContainingJar(shim.getClass(), HIVE_SHIMS_FILENAME_PATTERN)); - - return String.format( - "%s,%s,%s", - shimCommonJar.toString(), shimCommonSecureJar.toString(), shimJar.toString()); } // Storage vars @@ -263,7 +444,7 @@ public class LauncherDelegator extends TempletonDelegator { } /** * This is called by subclasses when they determined that the sumbmitted job requires - * metastore access (e.g. Pig job that uses HCatalog). This then determines if + * metastore access (e.g. Pig job that uses HCatalog). This then determines if * secure access is required and causes TempletonControllerJob to set up a delegation token. * @see TempletonControllerJob */ http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java index a30ecd1..dfa59f8 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java @@ -19,9 +19,15 @@ package org.apache.hive.hcatalog.templeton; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.ArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.JobStatus; @@ -31,20 +37,82 @@ import org.apache.hadoop.security.UserGroupInformation; * List jobs owned by a user. */ public class ListDelegator extends TempletonDelegator { + private static final Log LOG = LogFactory.getLog(ListDelegator.class); + private final String JOB_LIST_EXECUTE_THREAD_PREFIX = "JobListExecute"; + + /** + * Current thread id used to set in execution threads. + */ + private final String listThreadId = Thread.currentThread().getName(); + + /* + * Job request executor to list job status requests. + */ + private static JobRequestExecutor<List<JobItemBean>> jobRequest = + new JobRequestExecutor<List<JobItemBean>>(JobRequestExecutor.JobRequestType.List, + AppConfig.JOB_LIST_MAX_THREADS, AppConfig.JOB_LIST_TIMEOUT); + public ListDelegator(AppConfig appConf) { super(appConf); } - public List<String> run(String user, boolean showall) + /* + * List status jobs request. If maximum concurrent job list requests are configured then + * list request will be executed on a thread from thread pool. If job list request time out + * is configured then request execution thread will be interrupted if thread times out and + * does no action. + */ + public List<JobItemBean> run(final String user, final boolean showall, final String jobId, + final int numRecords, final boolean showDetails) + throws NotAuthorizedException, BadParam, IOException, InterruptedException, BusyException, + TimeoutException, ExecutionException, TooManyRequestsException { + + if (jobRequest.isThreadPoolEnabled()) { + return jobRequest.execute(getJobListTask(user, showall, jobId,numRecords, showDetails)); + } else { + return listJobs(user, showall, jobId, numRecords, showDetails); + } + } + + /* + * Job callable task for job list operation. Overrides behavior of execute() to list jobs. + * No need to override behavior of cleanup() as there is nothing to be done if list jobs + * operation is timed out or interrupted. + */ + private JobCallable<List<JobItemBean>> getJobListTask(final String user, final boolean showall, + final String jobId, final int numRecords, final boolean showDetails) { + return new JobCallable<List<JobItemBean>>() { + @Override + public List<JobItemBean> execute() throws NotAuthorizedException, BadParam, IOException, + InterruptedException { + /* + * Change the current thread name to include parent thread Id if it is executed + * in thread pool. Useful to extract logs specific to a job request and helpful + * to debug job issues. + */ + Thread.currentThread().setName(String.format("%s-%s-%s", JOB_LIST_EXECUTE_THREAD_PREFIX, + listThreadId, Thread.currentThread().getId())); + + return listJobs(user, showall, jobId, numRecords, showDetails); + } + }; + } + + /* + * Gets list of job ids and calls getJobStatus to get status for each job id. + */ + public List<JobItemBean> listJobs(String user, boolean showall, String jobId, + int numRecords, boolean showDetails) throws NotAuthorizedException, BadParam, IOException, InterruptedException { - UserGroupInformation ugi = UgiFactory.getUgi(user); + UserGroupInformation ugi = null; WebHCatJTShim tracker = null; + ArrayList<String> ids = new ArrayList<String>(); + try { + ugi = UgiFactory.getUgi(user); tracker = ShimLoader.getHadoopShims().getWebHCatShim(appConf, ugi); - ArrayList<String> ids = new ArrayList<String>(); - JobStatus[] jobs = tracker.getAllJobs(); if (jobs != null) { @@ -54,13 +122,81 @@ public class ListDelegator extends TempletonDelegator { ids.add(id); } } - - return ids; } catch (IllegalStateException e) { throw new BadParam(e.getMessage()); } finally { if (tracker != null) tracker.close(); + if (ugi != null) + FileSystem.closeAllForUGI(ugi); } + + return getJobStatus(ids, user, showall, jobId, numRecords, showDetails); + } + + /* + * Returns job status for list of input jobs as a list. + */ + public List<JobItemBean> getJobStatus(ArrayList<String> jobIds, String user, boolean showall, + String jobId, int numRecords, boolean showDetails) + throws IOException, InterruptedException { + + List<JobItemBean> detailList = new ArrayList<JobItemBean>(); + int currRecord = 0; + + // Sort the list as requested + boolean isAscendingOrder = true; + switch (appConf.getListJobsOrder()) { + case lexicographicaldesc: + Collections.sort(jobIds, Collections.reverseOrder()); + isAscendingOrder = false; + break; + case lexicographicalasc: + default: + Collections.sort(jobIds); + break; + } + + for (String job : jobIds) { + // If numRecords = -1, fetch all records. + // Hence skip all the below checks when numRecords = -1. + if (numRecords != -1) { + // If currRecord >= numRecords, we have already fetched the top #numRecords + if (currRecord >= numRecords) { + break; + } + else if (jobId == null || jobId.trim().length() == 0) { + currRecord++; + } + // If the current record needs to be returned based on the + // filter conditions specified by the user, increment the counter + else if (isAscendingOrder && job.compareTo(jobId) > 0 || !isAscendingOrder && job.compareTo(jobId) < 0) { + currRecord++; + } + // The current record should not be included in the output detailList. + else { + continue; + } + } + JobItemBean jobItem = new JobItemBean(); + jobItem.id = job; + if (showDetails) { + StatusDelegator sd = new StatusDelegator(appConf); + try { + jobItem.detail = sd.run(user, job, false); + } + catch(Exception ex) { + /* + * if we could not get status for some reason, log it, and send empty status back with + * just the ID so that caller knows to even look in the log file + */ + LOG.info("Failed to get status detail for jobId='" + job + "'", ex); + jobItem.detail = new QueueStatusBean(job, "Failed to retrieve status; see WebHCat logs"); + } + } + detailList.add(jobItem); + } + + return detailList; } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java index 5208bf4..3ed3ece 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Main.java @@ -25,6 +25,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.EnumSet; import java.util.HashMap; import org.slf4j.Logger; @@ -43,14 +44,15 @@ import org.eclipse.jetty.rewrite.handler.RedirectPatternRule; import org.eclipse.jetty.rewrite.handler.RewriteHandler; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.handler.HandlerList; import org.eclipse.jetty.servlet.FilterHolder; -import org.eclipse.jetty.servlet.FilterMapping; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.xml.XmlConfiguration; import org.slf4j.bridge.SLF4JBridgeHandler; +import javax.servlet.DispatcherType; import javax.servlet.http.HttpServletRequest; /** @@ -122,7 +124,7 @@ public class Main { checkEnv(); runServer(port); // Currently only print the first port to be consistent with old behavior - port = ArrayUtils.isEmpty(server.getConnectors()) ? -1 : server.getConnectors()[0].getPort(); + port = ArrayUtils.isEmpty(server.getConnectors()) ? -1 : ((ServerConnector)(server.getConnectors()[0])).getLocalPort(); System.out.println("templeton: listening on port " + port); LOG.info("Templeton listening on port " + port); @@ -185,6 +187,7 @@ public class Main { // Add the Auth filter FilterHolder fHolder = makeAuthFilter(); + EnumSet<DispatcherType> dispatches = EnumSet.of(DispatcherType.REQUEST); /* * We add filters for each of the URIs supported by templeton. @@ -193,28 +196,18 @@ public class Main { * This is because mapreduce does not use secure credentials for * callbacks. So jetty would fail the request as unauthorized. */ - root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/ddl/*", - FilterMapping.REQUEST); - root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/pig/*", - FilterMapping.REQUEST); - root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/hive/*", - FilterMapping.REQUEST); - root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/sqoop/*", - FilterMapping.REQUEST); - root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/queue/*", - FilterMapping.REQUEST); - root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/jobs/*", - FilterMapping.REQUEST); - root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/mapreduce/*", - FilterMapping.REQUEST); - root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/status/*", - FilterMapping.REQUEST); - root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/version/*", - FilterMapping.REQUEST); + root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/ddl/*", dispatches); + root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/pig/*", dispatches); + root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/hive/*", dispatches); + root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/sqoop/*", dispatches); + root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/queue/*", dispatches); + root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/jobs/*", dispatches); + root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/mapreduce/*", dispatches); + root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/status/*", dispatches); + root.addFilter(fHolder, "/" + SERVLET_PATH + "/v1/version/*", dispatches); if (conf.getBoolean(AppConfig.XSRF_FILTER_ENABLED, false)){ - root.addFilter(makeXSRFFilter(), "/" + SERVLET_PATH + "/*", - FilterMapping.REQUEST); + root.addFilter(makeXSRFFilter(), "/" + SERVLET_PATH + "/*", dispatches); LOG.debug("XSRF filter enabled"); } else { LOG.warn("XSRF filter disabled");