HIVE-19173 : Add Storage Handler runtime information as part of DESCRIBE EXTENDED (Nishant Bangarwa via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <hashut...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/aa040c5b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/aa040c5b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/aa040c5b Branch: refs/heads/branch-3 Commit: aa040c5bfcea2257b4aa89f39832a7d6198a43a0 Parents: b135724 Author: Nishant Bangarwa <nishant.mon...@gmail.com> Authored: Fri Apr 13 09:43:00 2018 -0700 Committer: Vineet Garg <vg...@apache.org> Committed: Tue May 8 16:05:54 2018 -0700 ---------------------------------------------------------------------- .../hadoop/hive/druid/DruidStorageHandler.java | 70 +++++- .../hive/druid/DruidStorageHandlerInfo.java | 72 ++++++ .../hive/druid/json/KafkaSupervisorReport.java | 231 +++++++++++++++++++ .../hadoop/hive/druid/json/TaskReportData.java | 125 ++++++++++ .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 7 +- .../apache/hadoop/hive/ql/metadata/Hive.java | 62 +++-- .../hive/ql/metadata/HiveStorageHandler.java | 17 +- .../hive/ql/metadata/StorageHandlerInfo.java | 38 +++ .../formatting/JsonMetaDataFormatter.java | 6 +- .../metadata/formatting/MetaDataFormatter.java | 4 +- .../formatting/TextMetaDataFormatter.java | 11 +- .../clientpositive/druidkafkamini_basic.q | 6 +- .../druid/druidkafkamini_basic.q.out | 26 ++- 13 files changed, 633 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index bc08bd8..3e707e3 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -35,6 +35,7 @@ import com.metamx.http.client.HttpClientInit; import com.metamx.http.client.Request; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; + import io.druid.data.input.impl.DimensionSchema; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.InputRowParser; @@ -59,6 +60,7 @@ import io.druid.segment.loading.SegmentLoadingException; import io.druid.storage.hdfs.HdfsDataSegmentPusher; import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig; import io.druid.timeline.DataSegment; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -69,6 +71,7 @@ import org.apache.hadoop.hive.druid.io.DruidOutputFormat; import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat; import org.apache.hadoop.hive.druid.io.DruidRecordWriter; import org.apache.hadoop.hive.druid.json.KafkaSupervisorIOConfig; +import org.apache.hadoop.hive.druid.json.KafkaSupervisorReport; import org.apache.hadoop.hive.druid.json.KafkaSupervisorSpec; import org.apache.hadoop.hive.druid.json.KafkaSupervisorTuningConfig; import org.apache.hadoop.hive.druid.security.KerberosHttpClient; @@ -82,6 +85,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; @@ -94,6 +98,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.common.util.ShutdownHookManager; + import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; @@ -116,6 +121,8 @@ import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; +import javax.annotation.Nullable; + import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER; /** @@ -454,7 +461,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor console.printInfo("Druid Kafka Ingestion Reset successful."); } else { throw new IOException(String - .format("Unable to stop Kafka Ingestion Druid status [%d] full response [%s]", + .format("Unable to reset Kafka Ingestion Druid status [%d] full response [%s]", response.getStatus().getCode(), response.getContent())); } } catch (Exception e) { @@ -486,7 +493,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor } - public KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) { + private KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) { // Stop Kafka Ingestion first final String overlordAddress = Preconditions.checkNotNull(HiveConf .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS), @@ -512,7 +519,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor return null; } else { throw new IOException(String - .format("Unable to stop Kafka Ingestion Druid status [%d] full response [%s]", + .format("Unable to fetch Kafka Ingestion Spec from Druid status [%d] full response [%s]", response.getStatus().getCode(), response.getContent())); } } catch (Exception e) { @@ -521,6 +528,46 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor } /** + * Fetches kafka supervisor status report from druid overlod. + * @param table + * @return kafka supervisor report or null when druid overlord is unreachable. + */ + @Nullable + private KafkaSupervisorReport fetchKafkaSupervisorReport(Table table) { + final String overlordAddress = Preconditions.checkNotNull(HiveConf + .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS), + "Druid Overlord Address is null"); + String dataSourceName = Preconditions + .checkNotNull(getTableProperty(table, Constants.DRUID_DATA_SOURCE), + "Druid Datasource name is null"); + try { + StatusResponseHolder response = RetryUtils.retry(() -> getHttpClient().go(new Request(HttpMethod.GET, + new URL(String + .format("http://%s/druid/indexer/v1/supervisor/%s/status", overlordAddress, + dataSourceName))), + new StatusResponseHandler( + Charset.forName("UTF-8"))).get(), + input -> input instanceof IOException, + getMaxRetryCount()); + if (response.getStatus().equals(HttpResponseStatus.OK)) { + return DruidStorageHandlerUtils.JSON_MAPPER + .readValue(response.getContent(), KafkaSupervisorReport.class); + // Druid Returns 400 Bad Request when not found. + } else if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND) || response.getStatus().equals(HttpResponseStatus.BAD_REQUEST)) { + LOG.info("No Kafka Supervisor found for datasource[%s]", dataSourceName); + return null; + } else { + LOG.error("Unable to fetch Kafka Supervisor status [%d] full response [%s]", + response.getStatus().getCode(), response.getContent()); + return null; + } + } catch (Exception e) { + LOG.error("Exception while fetching kafka ingestion spec from druid", e); + return null; + } + } + + /** * Creates metadata moves then commit the Segment's metadata to Druid metadata store in one TxN * * @param table Hive table @@ -995,6 +1042,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor updateKafkaIngestion(table); } } + private static <T> Boolean getBooleanProperty(Table table, String propertyName) { String val = getTableProperty(table, propertyName); if (val == null) { @@ -1057,4 +1105,20 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor private int getMaxRetryCount() { return HiveConf.getIntVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_MAX_TRIES); } + + @Override + public StorageHandlerInfo getStorageHandlerInfo(Table table) throws MetaException { + if(isKafkaStreamingTable(table)){ + KafkaSupervisorReport kafkaSupervisorReport = fetchKafkaSupervisorReport(table); + if(kafkaSupervisorReport == null){ + return DruidStorageHandlerInfo.UNREACHABLE; + } + return new DruidStorageHandlerInfo(kafkaSupervisorReport); + } + else + // TODO: Currently we do not expose any runtime info for non-streaming tables. + // In future extend this add more information regarding table status. + // e.g. Total size of segments in druid, loadstatus of table on historical nodes etc. + return null; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerInfo.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerInfo.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerInfo.java new file mode 100644 index 0000000..f0e1750 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerInfo.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid; + +import io.druid.java.util.common.StringUtils; + +import org.apache.hadoop.hive.druid.json.KafkaSupervisorReport; +import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo; + +/** + * DruidStorageHandlerInfo provides a runtime information for DruidStorageHandler. + */ +@SuppressWarnings("serial") +public class DruidStorageHandlerInfo implements StorageHandlerInfo { + + public static final StorageHandlerInfo UNREACHABLE = new StorageHandlerInfo() { + @Override + public String formatAsText() { + return "Druid Overlord is Unreachable, Runtime Status : unknown"; + } + }; + + private final KafkaSupervisorReport kafkaSupervisorReport; + + DruidStorageHandlerInfo(KafkaSupervisorReport kafkaSupervisorReport) { + this.kafkaSupervisorReport = kafkaSupervisorReport; + } + + @Override + public String formatAsText() { + StringBuilder sb = new StringBuilder(); + sb.append("Druid Storage Handler Runtime Status for " + kafkaSupervisorReport.getId()); + sb.append("\n"); + sb.append("kafkaPartitions=" + kafkaSupervisorReport.getPayload().getPartitions()); + sb.append("\n"); + sb.append("activeTasks=" + kafkaSupervisorReport.getPayload().getActiveTasks()); + sb.append("\n"); + sb.append("publishingTasks=" + kafkaSupervisorReport.getPayload().getPublishingTasks()); + if (kafkaSupervisorReport.getPayload().getLatestOffsets() != null) { + sb.append("\n"); + sb.append("latestOffsets=" + kafkaSupervisorReport.getPayload().getLatestOffsets()); + } + if (kafkaSupervisorReport.getPayload().getMinimumLag() != null) { + sb.append("\n"); + sb.append("minimumLag=" + kafkaSupervisorReport.getPayload().getMinimumLag()); + } + if (kafkaSupervisorReport.getPayload().getAggregateLag() != null) { + sb.append("\n"); + sb.append("aggregateLag=" + kafkaSupervisorReport.getPayload().getAggregateLag()); + } + if (kafkaSupervisorReport.getPayload().getOffsetsLastUpdated() != null) { + sb.append("\n"); + sb.append("lastUpdateTime=" + kafkaSupervisorReport.getPayload().getOffsetsLastUpdated()); + } + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorReport.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorReport.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorReport.java new file mode 100644 index 0000000..5a6756e --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorReport.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid.json; + +import io.druid.guice.annotations.Json; +import io.druid.indexing.overlord.supervisor.SupervisorReport; +import io.druid.java.util.common.IAE; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Lists; + +import org.joda.time.DateTime; + +import java.util.List; +import java.util.Map; + +import javax.annotation.Nullable; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public class KafkaSupervisorReport extends SupervisorReport +{ + public static class KafkaSupervisorReportPayload + { + private final String dataSource; + private final String topic; + private final Integer partitions; + private final Integer replicas; + private final Long durationSeconds; + private final List<TaskReportData> activeTasks; + private final List<TaskReportData> publishingTasks; + private final Map<Integer, Long> latestOffsets; + private final Map<Integer, Long> minimumLag; + private final Long aggregateLag; + private final DateTime offsetsLastUpdated; + + @JsonCreator + public KafkaSupervisorReportPayload( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("topic") String topic, + @JsonProperty("partitions") Integer partitions, + @JsonProperty("replicas") Integer replicas, + @JsonProperty("durationSeconds") Long durationSeconds, + @Nullable @JsonProperty("latestOffsets") Map<Integer, Long> latestOffsets, + @Nullable @JsonProperty("minimumLag") Map<Integer, Long> minimumLag, + @Nullable @JsonProperty("aggregateLag") Long aggregateLag, + @Nullable @JsonProperty("offsetsLastUpdated") DateTime offsetsLastUpdated + ) + { + this.dataSource = dataSource; + this.topic = topic; + this.partitions = partitions; + this.replicas = replicas; + this.durationSeconds = durationSeconds; + this.activeTasks = Lists.newArrayList(); + this.publishingTasks = Lists.newArrayList(); + this.latestOffsets = latestOffsets; + this.minimumLag = minimumLag; + this.aggregateLag = aggregateLag; + this.offsetsLastUpdated = offsetsLastUpdated; + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public String getTopic() + { + return topic; + } + + @JsonProperty + public Integer getPartitions() + { + return partitions; + } + + @JsonProperty + public Integer getReplicas() + { + return replicas; + } + + @JsonProperty + public Long getDurationSeconds() + { + return durationSeconds; + } + + @JsonProperty + public List<TaskReportData> getActiveTasks() + { + return activeTasks; + } + + @JsonProperty + public List<TaskReportData> getPublishingTasks() + { + return publishingTasks; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Map<Integer, Long> getLatestOffsets() + { + return latestOffsets; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Map<Integer, Long> getMinimumLag() + { + return minimumLag; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Long getAggregateLag() + { + return aggregateLag; + } + + @JsonProperty + public DateTime getOffsetsLastUpdated() + { + return offsetsLastUpdated; + } + + @Override + public String toString() + { + return "{" + + "dataSource='" + dataSource + '\'' + + ", topic='" + topic + '\'' + + ", partitions=" + partitions + + ", replicas=" + replicas + + ", durationSeconds=" + durationSeconds + + ", active=" + activeTasks + + ", publishing=" + publishingTasks + + (latestOffsets != null ? ", latestOffsets=" + latestOffsets : "") + + (minimumLag != null ? ", minimumLag=" + minimumLag : "") + + (aggregateLag != null ? ", aggregateLag=" + aggregateLag : "") + + (offsetsLastUpdated != null ? ", offsetsLastUpdated=" + offsetsLastUpdated : "") + + '}'; + } + } + + private final KafkaSupervisorReportPayload payload; + + @JsonCreator + public KafkaSupervisorReport(@JsonProperty("id") String id, + @JsonProperty("generationTime")DateTime generationTime, + @JsonProperty("payload") KafkaSupervisorReportPayload payload){ + super(id, generationTime); + this.payload = payload; + } + + public KafkaSupervisorReport( + String dataSource, + DateTime generationTime, + String topic, + Integer partitions, + Integer replicas, + Long durationSeconds, + @Nullable Map<Integer, Long> latestOffsets, + @Nullable Map<Integer, Long> minimumLag, + @Nullable Long aggregateLag, + @Nullable DateTime offsetsLastUpdated + ) { + this(dataSource, generationTime, new KafkaSupervisorReportPayload( + dataSource, + topic, + partitions, + replicas, + durationSeconds, + latestOffsets, + minimumLag, + aggregateLag, + offsetsLastUpdated + )); + } + + @Override + public KafkaSupervisorReportPayload getPayload() + { + return payload; + } + + public void addTask(TaskReportData data) + { + if (data.getType().equals(TaskReportData.TaskType.ACTIVE)) { + payload.activeTasks.add(data); + } else if (data.getType().equals(TaskReportData.TaskType.PUBLISHING)) { + payload.publishingTasks.add(data); + } else { + throw new IAE("Unknown task type [%s]", data.getType().name()); + } + } + + @Override + public String toString() + { + return "{" + + "id='" + getId() + '\'' + + ", generationTime=" + getGenerationTime() + + ", payload=" + payload + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java new file mode 100644 index 0000000..94a3f7f --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.druid.json; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +import org.joda.time.DateTime; + +import java.util.Map; + +import javax.annotation.Nullable; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public class TaskReportData +{ + public enum TaskType + { + ACTIVE, PUBLISHING, UNKNOWN + } + + private final String id; + private final Map<Integer, Long> startingOffsets; + private final DateTime startTime; + private final Long remainingSeconds; + private final TaskType type; + private final Map<Integer, Long> currentOffsets; + private final Map<Integer, Long> lag; + + public TaskReportData( + String id, + @Nullable Map<Integer, Long> startingOffsets, + @Nullable Map<Integer, Long> currentOffsets, + DateTime startTime, + Long remainingSeconds, + TaskType type, + @Nullable Map<Integer, Long> lag + ) + { + this.id = id; + this.startingOffsets = startingOffsets; + this.currentOffsets = currentOffsets; + this.startTime = startTime; + this.remainingSeconds = remainingSeconds; + this.type = type; + this.lag = lag; + } + + @JsonProperty + public String getId() + { + return id; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Map<Integer, Long> getStartingOffsets() + { + return startingOffsets; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Map<Integer, Long> getCurrentOffsets() + { + return currentOffsets; + } + + @JsonProperty + public DateTime getStartTime() + { + return startTime; + } + + @JsonProperty + public Long getRemainingSeconds() + { + return remainingSeconds; + } + + @JsonProperty + public TaskType getType() + { + return type; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Map<Integer, Long> getLag() + { + return lag; + } + + @Override + public String toString() + { + return "{" + + "id='" + id + '\'' + + (startingOffsets != null ? ", startingOffsets=" + startingOffsets : "") + + (currentOffsets != null ? ", currentOffsets=" + currentOffsets : "") + + ", startTime=" + startTime + + ", remainingSeconds=" + remainingSeconds + + (lag != null ? ", lag=" + lag : "") + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index b3c95eb..c9c5054 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.exec; import static org.apache.commons.lang.StringUtils.join; -import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE; import java.io.BufferedWriter; @@ -78,7 +77,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.PartitionDropOptions; import org.apache.hadoop.hive.metastore.StatObjectConverter; import org.apache.hadoop.hive.metastore.TableType; @@ -166,6 +164,7 @@ import org.apache.hadoop.hive.ql.metadata.NotNullConstraint; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PartitionIterable; import org.apache.hadoop.hive.ql.metadata.PrimaryKeyInfo; +import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.UniqueConstraint; import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils; @@ -3757,6 +3756,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { NotNullConstraint nnInfo = null; DefaultConstraint dInfo = null; CheckConstraint cInfo = null; + StorageHandlerInfo storageHandlerInfo = null; if (descTbl.isExt() || descTbl.isFormatted()) { pkInfo = db.getPrimaryKeys(tbl.getDbName(), tbl.getTableName()); fkInfo = db.getForeignKeys(tbl.getDbName(), tbl.getTableName()); @@ -3764,6 +3764,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { nnInfo = db.getNotNullConstraints(tbl.getDbName(), tbl.getTableName()); dInfo = db.getDefaultConstraints(tbl.getDbName(), tbl.getTableName()); cInfo = db.getCheckConstraints(tbl.getDbName(), tbl.getTableName()); + storageHandlerInfo = db.getStorageHandlerInfo(tbl); } fixDecimalColumnTypeName(cols); // In case the query is served by HiveServer2, don't pad it with spaces, @@ -3772,7 +3773,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { formatter.describeTable(outStream, colPath, tableName, tbl, part, cols, descTbl.isFormatted(), descTbl.isExt(), isOutputPadded, colStats, - pkInfo, fkInfo, ukInfo, nnInfo, dInfo, cInfo); + pkInfo, fkInfo, ukInfo, nnInfo, dInfo, cInfo, storageHandlerInfo); LOG.debug("DDLTask: written data for {}", tableName); http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 3218f96..64b3f83 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -58,6 +58,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.Set; import java.util.stream.Collectors; +import javax.annotation.Nullable; import javax.jdo.JDODataStoreException; import com.google.common.collect.ImmutableList; @@ -95,7 +96,6 @@ import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.hive.common.classification.InterfaceStability.Unstable; import org.apache.hadoop.hive.common.log.InPlaceUpdate; -import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.io.HdfsUtils; @@ -4166,29 +4166,14 @@ private void constructOneLBLocationMap(FileStatus fSta, private IMetaStoreClient createMetaStoreClient(boolean allowEmbedded) throws MetaException { HiveMetaHookLoader hookLoader = new HiveMetaHookLoader() { - @Override - public HiveMetaHook getHook( - org.apache.hadoop.hive.metastore.api.Table tbl) - throws MetaException { - - try { - if (tbl == null) { - return null; - } - HiveStorageHandler storageHandler = - HiveUtils.getStorageHandler(conf, - tbl.getParameters().get(META_TABLE_STORAGE)); - if (storageHandler == null) { - return null; - } - return storageHandler.getMetaHook(); - } catch (HiveException ex) { - LOG.error(StringUtils.stringifyException(ex)); - throw new MetaException( - "Failed to load storage handler: " + ex.getMessage()); - } - } - }; + @Override + public HiveMetaHook getHook( + org.apache.hadoop.hive.metastore.api.Table tbl) + throws MetaException { + HiveStorageHandler storageHandler = createStorageHandler(tbl); + return storageHandler == null ? null : storageHandler.getMetaHook(); + } + }; if (conf.getBoolVar(ConfVars.METASTORE_FASTPATH)) { return new SessionHiveMetaStoreClient(conf, hookLoader, allowEmbedded); @@ -4198,6 +4183,22 @@ private void constructOneLBLocationMap(FileStatus fSta, } } + @Nullable + private HiveStorageHandler createStorageHandler(org.apache.hadoop.hive.metastore.api.Table tbl) throws MetaException { + try { + if (tbl == null) { + return null; + } + HiveStorageHandler storageHandler = + HiveUtils.getStorageHandler(conf, tbl.getParameters().get(META_TABLE_STORAGE)); + return storageHandler; + } catch (HiveException ex) { + LOG.error(StringUtils.stringifyException(ex)); + throw new MetaException( + "Failed to load storage handler: " + ex.getMessage()); + } + } + public static class SchemaException extends MetaException { private static final long serialVersionUID = 1L; public SchemaException(String message) { @@ -5115,4 +5116,15 @@ private void constructOneLBLocationMap(FileStatus fSta, throw new HiveException(e); } } -}; + + @Nullable + public StorageHandlerInfo getStorageHandlerInfo(Table table) + throws HiveException { + try { + HiveStorageHandler storageHandler = createStorageHandler(table.getTTable()); + return storageHandler == null ? null : storageHandler.getStorageHandlerInfo(table.getTTable()); + } catch (Exception e) { + throw new HiveException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java index 99bb9f6..1696243 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java @@ -18,12 +18,15 @@ package org.apache.hadoop.hive.ql.metadata; +import java.util.Collections; import java.util.Map; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; @@ -149,7 +152,19 @@ public interface HiveStorageHandler extends Configurable { * Called just before submitting MapReduce job. * * @param tableDesc descriptor for the table being accessed - * @param JobConf jobConf for MapReduce job + * @param jobConf jobConf for MapReduce job */ public void configureJobConf(TableDesc tableDesc, JobConf jobConf); + + /** + * Used to fetch runtime information about storage handler during DESCRIBE EXTENDED statement + * + * @param table table definition + * @return StorageHandlerInfo containing runtime information about storage handler + * OR `null` if the storage handler choose to not provide any runtime information. + */ + public default StorageHandlerInfo getStorageHandlerInfo(Table table) throws MetaException + { + return null; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java new file mode 100644 index 0000000..dbc44a6 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.metadata; + +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +/** + * StorageHandlerInfo is a marker interface used to provide runtime information associated with a storage handler. + */ +public interface StorageHandlerInfo extends Serializable { + /** + * Called from Describe Extended Statement when Formatter is Text Formatter. + * @return Formatted StorageHandlerInfo as String + */ + String formatAsText(); +} http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java index cd70eee..c21967c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.NotNullConstraint; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PrimaryKeyInfo; +import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.UniqueConstraint; import org.codehaus.jackson.JsonGenerator; @@ -117,7 +118,7 @@ public class JsonMetaDataFormatter implements MetaDataFormatter { boolean isOutputPadded, List<ColumnStatisticsObj> colStats, PrimaryKeyInfo pkInfo, ForeignKeyInfo fkInfo, UniqueConstraint ukInfo, NotNullConstraint nnInfo, DefaultConstraint dInfo, - CheckConstraint cInfo) throws HiveException { + CheckConstraint cInfo, StorageHandlerInfo storageHandlerInfo) throws HiveException { MapBuilder builder = MapBuilder.create(); builder.put("columns", makeColsUnformatted(cols)); @@ -146,6 +147,9 @@ public class JsonMetaDataFormatter implements MetaDataFormatter { if (cInfo != null && !cInfo.getCheckConstraints().isEmpty()) { builder.put("checkConstraintInfo", cInfo); } + if(storageHandlerInfo != null) { + builder.put("storageHandlerInfo", storageHandlerInfo.toString()); + } } asJson(out, builder.build()); http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatter.java index ed2cdd1..d15016c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatter.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.NotNullConstraint; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PrimaryKeyInfo; +import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.UniqueConstraint; @@ -91,7 +92,8 @@ public interface MetaDataFormatter { boolean isFormatted, boolean isExt, boolean isOutputPadded, List<ColumnStatisticsObj> colStats, PrimaryKeyInfo pkInfo, ForeignKeyInfo fkInfo, - UniqueConstraint ukInfo, NotNullConstraint nnInfo, DefaultConstraint dInfo, CheckConstraint cInfo) + UniqueConstraint ukInfo, NotNullConstraint nnInfo, DefaultConstraint dInfo, CheckConstraint cInfo, + StorageHandlerInfo storageHandlerInfo) throws HiveException; /** http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java index 63a2969..2529923 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo; import org.apache.hive.common.util.HiveStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -132,7 +133,8 @@ class TextMetaDataFormatter implements MetaDataFormatter { boolean isFormatted, boolean isExt, boolean isOutputPadded, List<ColumnStatisticsObj> colStats, PrimaryKeyInfo pkInfo, ForeignKeyInfo fkInfo, - UniqueConstraint ukInfo, NotNullConstraint nnInfo, DefaultConstraint dInfo, CheckConstraint cInfo) + UniqueConstraint ukInfo, NotNullConstraint nnInfo, DefaultConstraint dInfo, CheckConstraint cInfo, + StorageHandlerInfo storageHandlerInfo) throws HiveException { try { List<FieldSchema> partCols = tbl.isPartitioned() ? tbl.getPartCols() : null; @@ -252,6 +254,13 @@ class TextMetaDataFormatter implements MetaDataFormatter { outStream.write(terminator); } } + + if (storageHandlerInfo!= null) { + outStream.write(("StorageHandlerInfo").getBytes("UTF-8")); + outStream.write(terminator); + outStream.write(storageHandlerInfo.formatAsText().getBytes("UTF-8")); + outStream.write(terminator); + } } } } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/druidkafkamini_basic.q b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q index f4fd2a6..4c30cdd 100644 --- a/ql/src/test/queries/clientpositive/druidkafkamini_basic.q +++ b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q @@ -9,7 +9,7 @@ CREATE TABLE druid_kafka_test(`__time` timestamp, page string, `user` string, la "druid.kafka.ingestion.useEarliestOffset" = "true", "druid.kafka.ingestion.maxRowsInMemory" = "5", "druid.kafka.ingestion.startDelay" = "PT1S", - "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.taskDuration" = "PT20S", "druid.kafka.ingestion.period" = "PT1S" ); @@ -18,7 +18,7 @@ ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'START' !curl -ss http://localhost:8081/druid/indexer/v1/supervisor; -- Sleep for some time for ingestion tasks to ingest events -!sleep 50; +!sleep 60; DESCRIBE druid_kafka_test; DESCRIBE EXTENDED druid_kafka_test; @@ -32,7 +32,7 @@ Select page FROM druid_kafka_test order by page; ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'RESET'); -- Sleep for some time for ingestion tasks to ingest events -!sleep 50; +!sleep 60; DESCRIBE druid_kafka_test; DESCRIBE EXTENDED druid_kafka_test; http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out index 6f553fa..c2cc249 100644 --- a/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out +++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out @@ -8,7 +8,7 @@ PREHOOK: query: CREATE TABLE druid_kafka_test(`__time` timestamp, page string, ` "druid.kafka.ingestion.useEarliestOffset" = "true", "druid.kafka.ingestion.maxRowsInMemory" = "5", "druid.kafka.ingestion.startDelay" = "PT1S", - "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.taskDuration" = "PT20S", "druid.kafka.ingestion.period" = "PT1S" ) PREHOOK: type: CREATETABLE @@ -24,7 +24,7 @@ POSTHOOK: query: CREATE TABLE druid_kafka_test(`__time` timestamp, page string, "druid.kafka.ingestion.useEarliestOffset" = "true", "druid.kafka.ingestion.maxRowsInMemory" = "5", "druid.kafka.ingestion.startDelay" = "PT1S", - "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.taskDuration" = "PT20S", "druid.kafka.ingestion.period" = "PT1S" ) POSTHOOK: type: CREATETABLE @@ -65,6 +65,15 @@ added int from deserializer deleted int from deserializer #### A masked pattern was here #### +StorageHandlerInfo +Druid Storage Handler Runtime Status for default.druid_kafka_test +kafkaPartitions=1 +activeTasks=[] +publishingTasks=[] +latestOffsets={0=10} +minimumLag={} +aggregateLag=0 +#### A masked pattern was here #### PREHOOK: query: Select count(*) FROM druid_kafka_test PREHOOK: type: QUERY PREHOOK: Input: default@druid_kafka_test @@ -126,6 +135,15 @@ added int from deserializer deleted int from deserializer #### A masked pattern was here #### +StorageHandlerInfo +Druid Storage Handler Runtime Status for default.druid_kafka_test +kafkaPartitions=1 +activeTasks=[] +publishingTasks=[] +latestOffsets={0=10} +minimumLag={} +aggregateLag=0 +#### A masked pattern was here #### PREHOOK: query: Select count(*) FROM druid_kafka_test PREHOOK: type: QUERY PREHOOK: Input: default@druid_kafka_test @@ -331,7 +349,7 @@ STAGE PLANS: druid.kafka.ingestion.maxRowsInMemory 5 druid.kafka.ingestion.period PT1S druid.kafka.ingestion.startDelay PT1S - druid.kafka.ingestion.taskDuration PT30S + druid.kafka.ingestion.taskDuration PT20S druid.kafka.ingestion.useEarliestOffset true druid.query.granularity MINUTE druid.query.json {"queryType":"scan","dataSource":"default.druid_kafka_test","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"not","field":{"type":"selector","dimension":"language","value":null}},"columns":["language","user"],"resultFormat":"compactedList"} @@ -370,7 +388,7 @@ STAGE PLANS: druid.kafka.ingestion.maxRowsInMemory 5 druid.kafka.ingestion.period PT1S druid.kafka.ingestion.startDelay PT1S - druid.kafka.ingestion.taskDuration PT30S + druid.kafka.ingestion.taskDuration PT20S druid.kafka.ingestion.useEarliestOffset true druid.query.granularity MINUTE druid.query.json {"queryType":"scan","dataSource":"default.druid_kafka_test","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"not","field":{"type":"selector","dimension":"language","value":null}},"columns":["language","user"],"resultFormat":"compactedList"}