http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java new file mode 100644 index 0000000..1bb77a0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse.TimelineWriteError; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; + +/** + * This implements a local file based backend for storing application timeline + * information. This implementation may not provide a complete implementation of + * all the necessary features. This implementation is provided solely for basic + * testing purposes, and should not be used in a non-test situation. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class FileSystemTimelineWriterImpl extends AbstractService + implements TimelineWriter { + + private String outputRoot; + + /** Config param for timeline service storage tmp root for FILE YARN-3264. */ + public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT + = YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir"; + + /** default value for storage location on local disk. */ + public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT + = "/tmp/timeline_service_data"; + + public static final String ENTITIES_DIR = "entities"; + + /** Default extension for output files. */ + public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist"; + + FileSystemTimelineWriterImpl() { + super((FileSystemTimelineWriterImpl.class.getName())); + } + + @Override + public TimelineWriteResponse write(String clusterId, String userId, + String flowName, String flowVersion, long flowRunId, String appId, + TimelineEntities entities) throws IOException { + TimelineWriteResponse response = new TimelineWriteResponse(); + for (TimelineEntity entity : entities.getEntities()) { + write(clusterId, userId, flowName, flowVersion, flowRunId, appId, entity, + response); + } + return response; + } + + private synchronized void write(String clusterId, String userId, + String flowName, String flowVersion, long flowRun, String appId, + TimelineEntity entity, TimelineWriteResponse response) + throws IOException { + PrintWriter out = null; + try { + String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId, + escape(flowName), escape(flowVersion), String.valueOf(flowRun), appId, + entity.getType()); + String fileName = dir + entity.getId() + + TIMELINE_SERVICE_STORAGE_EXTENSION; + out = + new PrintWriter(new BufferedWriter(new OutputStreamWriter( + new FileOutputStream(fileName, true), "UTF-8"))); + out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity)); + out.write("\n"); + } catch (IOException ioe) { + TimelineWriteError error = new TimelineWriteError(); + error.setEntityId(entity.getId()); + error.setEntityType(entity.getType()); + /* + * TODO: set an appropriate error code after PoC could possibly be: + * error.setErrorCode(TimelineWriteError.IO_EXCEPTION); + */ + response.addError(error); + } finally { + if (out != null) { + out.close(); + } + } + } + + @Override + public TimelineWriteResponse aggregate(TimelineEntity data, + TimelineAggregationTrack track) throws IOException { + return null; + + } + + public String getOutputRoot() { + return outputRoot; + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT, + DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT); + } + + @Override + public void serviceStart() throws Exception { + mkdirs(outputRoot, ENTITIES_DIR); + } + + @Override + public void flush() throws IOException { + // no op + } + + private static String mkdirs(String... dirStrs) throws IOException { + StringBuilder path = new StringBuilder(); + for (String dirStr : dirStrs) { + path.append(dirStr).append('/'); + File dir = new File(path.toString()); + if (!dir.exists()) { + if (!dir.mkdirs()) { + throw new IOException("Could not create directories for " + dir); + } + } + } + return path.toString(); + } + + // specifically escape the separator character + private static String escape(String str) { + return str.replace(File.separatorChar, '_'); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java new file mode 100644 index 0000000..a384a84 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + + +import java.io.IOException; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader; +import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory; + +/** + * HBase based implementation for {@link TimelineReader}. + */ +public class HBaseTimelineReaderImpl + extends AbstractService implements TimelineReader { + + private static final Log LOG = LogFactory + .getLog(HBaseTimelineReaderImpl.class); + + private Configuration hbaseConf = null; + private Connection conn; + + public HBaseTimelineReaderImpl() { + super(HBaseTimelineReaderImpl.class.getName()); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + hbaseConf = HBaseConfiguration.create(conf); + conn = ConnectionFactory.createConnection(hbaseConf); + } + + @Override + protected void serviceStop() throws Exception { + if (conn != null) { + LOG.info("closing the hbase Connection"); + conn.close(); + } + super.serviceStop(); + } + + @Override + public TimelineEntity getEntity(TimelineReaderContext context, + TimelineDataToRetrieve dataToRetrieve) throws IOException { + TimelineEntityReader reader = + TimelineEntityReaderFactory.createSingleEntityReader(context, + dataToRetrieve); + return reader.readEntity(hbaseConf, conn); + } + + @Override + public Set<TimelineEntity> getEntities(TimelineReaderContext context, + TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve) + throws IOException { + TimelineEntityReader reader = + TimelineEntityReaderFactory.createMultipleEntitiesReader(context, + filters, dataToRetrieve); + return reader.readEntities(hbaseConf, conn); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java new file mode 100644 index 0000000..3511a2f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -0,0 +1,571 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; + +/** + * This implements a hbase based backend for storing the timeline entity + * information. + * It writes to multiple tables at the backend + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class HBaseTimelineWriterImpl extends AbstractService implements + TimelineWriter { + + private static final Log LOG = LogFactory + .getLog(HBaseTimelineWriterImpl.class); + + private Connection conn; + private TypedBufferedMutator<EntityTable> entityTable; + private TypedBufferedMutator<AppToFlowTable> appToFlowTable; + private TypedBufferedMutator<ApplicationTable> applicationTable; + private TypedBufferedMutator<FlowActivityTable> flowActivityTable; + private TypedBufferedMutator<FlowRunTable> flowRunTable; + + /** + * Used to convert strings key components to and from storage format. + */ + private final KeyConverter<String> stringKeyConverter = + new StringKeyConverter(); + + /** + * Used to convert Long key components to and from storage format. + */ + private final KeyConverter<Long> longKeyConverter = new LongKeyConverter(); + + public HBaseTimelineWriterImpl() { + super(HBaseTimelineWriterImpl.class.getName()); + } + + public HBaseTimelineWriterImpl(Configuration conf) throws IOException { + super(conf.get("yarn.application.id", + HBaseTimelineWriterImpl.class.getName())); + } + + /** + * initializes the hbase connection to write to the entity table. + */ + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + Configuration hbaseConf = HBaseConfiguration.create(conf); + conn = ConnectionFactory.createConnection(hbaseConf); + entityTable = new EntityTable().getTableMutator(hbaseConf, conn); + appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn); + applicationTable = new ApplicationTable().getTableMutator(hbaseConf, conn); + flowRunTable = new FlowRunTable().getTableMutator(hbaseConf, conn); + flowActivityTable = + new FlowActivityTable().getTableMutator(hbaseConf, conn); + } + + /** + * Stores the entire information in TimelineEntities to the timeline store. + */ + @Override + public TimelineWriteResponse write(String clusterId, String userId, + String flowName, String flowVersion, long flowRunId, String appId, + TimelineEntities data) throws IOException { + + TimelineWriteResponse putStatus = new TimelineWriteResponse(); + // defensive coding to avoid NPE during row key construction + if ((flowName == null) || (appId == null) || (clusterId == null) + || (userId == null)) { + LOG.warn("Found null for one of: flowName=" + flowName + " appId=" + appId + + " userId=" + userId + " clusterId=" + clusterId + + " . Not proceeding with writing to hbase"); + return putStatus; + } + + for (TimelineEntity te : data.getEntities()) { + + // a set can have at most 1 null + if (te == null) { + continue; + } + + // if the entity is the application, the destination is the application + // table + boolean isApplication = isApplicationEntity(te); + byte[] rowKey; + if (isApplication) { + ApplicationRowKey applicationRowKey = + new ApplicationRowKey(clusterId, userId, flowName, flowRunId, + appId); + rowKey = applicationRowKey.getRowKey(); + } else { + EntityRowKey entityRowKey = + new EntityRowKey(clusterId, userId, flowName, flowRunId, appId, + te.getType(), te.getId()); + rowKey = entityRowKey.getRowKey(); + } + + storeInfo(rowKey, te, flowVersion, isApplication); + storeEvents(rowKey, te.getEvents(), isApplication); + storeConfig(rowKey, te.getConfigs(), isApplication); + storeMetrics(rowKey, te.getMetrics(), isApplication); + storeRelations(rowKey, te, isApplication); + + if (isApplication) { + TimelineEvent event = + getApplicationEvent(te, + ApplicationMetricsConstants.CREATED_EVENT_TYPE); + FlowRunRowKey flowRunRowKey = + new FlowRunRowKey(clusterId, userId, flowName, flowRunId); + if (event != null) { + AppToFlowRowKey appToFlowRowKey = + new AppToFlowRowKey(clusterId, appId); + onApplicationCreated(flowRunRowKey, appToFlowRowKey, appId, userId, + flowVersion, te, event.getTimestamp()); + } + // if it's an application entity, store metrics + storeFlowMetricsAppRunning(flowRunRowKey, appId, te); + // if application has finished, store it's finish time and write final + // values of all metrics + event = getApplicationEvent(te, + ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + if (event != null) { + onApplicationFinished(flowRunRowKey, flowVersion, appId, te, + event.getTimestamp()); + } + } + } + return putStatus; + } + + private void onApplicationCreated(FlowRunRowKey flowRunRowKey, + AppToFlowRowKey appToFlowRowKey, String appId, String userId, + String flowVersion, TimelineEntity te, long appCreatedTimeStamp) + throws IOException { + + String flowName = flowRunRowKey.getFlowName(); + Long flowRunId = flowRunRowKey.getFlowRunId(); + + // store in App to flow table + byte[] rowKey = appToFlowRowKey.getRowKey(); + AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName); + AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId); + AppToFlowColumn.USER_ID.store(rowKey, appToFlowTable, null, userId); + + // store in flow run table + storeAppCreatedInFlowRunTable(flowRunRowKey, appId, te); + + // store in flow activity table + byte[] flowActivityRowKeyBytes = + new FlowActivityRowKey(flowRunRowKey.getClusterId(), + appCreatedTimeStamp, flowRunRowKey.getUserId(), flowName) + .getRowKey(); + byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId()); + FlowActivityColumnPrefix.RUN_ID.store(flowActivityRowKeyBytes, + flowActivityTable, qualifier, null, flowVersion, + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); + } + + /* + * updates the {@link FlowRunTable} with Application Created information + */ + private void storeAppCreatedInFlowRunTable(FlowRunRowKey flowRunRowKey, + String appId, TimelineEntity te) throws IOException { + byte[] rowKey = flowRunRowKey.getRowKey(); + FlowRunColumn.MIN_START_TIME.store(rowKey, flowRunTable, null, + te.getCreatedTime(), + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); + } + + + /* + * updates the {@link FlowRunTable} and {@link FlowActivityTable} when an + * application has finished + */ + private void onApplicationFinished(FlowRunRowKey flowRunRowKey, + String flowVersion, String appId, TimelineEntity te, + long appFinishedTimeStamp) throws IOException { + // store in flow run table + storeAppFinishedInFlowRunTable(flowRunRowKey, appId, te, + appFinishedTimeStamp); + + // indicate in the flow activity table that the app has finished + byte[] rowKey = + new FlowActivityRowKey(flowRunRowKey.getClusterId(), + appFinishedTimeStamp, flowRunRowKey.getUserId(), + flowRunRowKey.getFlowName()).getRowKey(); + byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId()); + FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier, + null, flowVersion, + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); + } + + /* + * Update the {@link FlowRunTable} with Application Finished information + */ + private void storeAppFinishedInFlowRunTable(FlowRunRowKey flowRunRowKey, + String appId, TimelineEntity te, long appFinishedTimeStamp) + throws IOException { + byte[] rowKey = flowRunRowKey.getRowKey(); + Attribute attributeAppId = + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId); + FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null, + appFinishedTimeStamp, attributeAppId); + + // store the final value of metrics since application has finished + Set<TimelineMetric> metrics = te.getMetrics(); + if (metrics != null) { + storeFlowMetrics(rowKey, metrics, attributeAppId, + AggregationOperation.SUM_FINAL.getAttribute()); + } + } + + /* + * Updates the {@link FlowRunTable} with Application Metrics + */ + private void storeFlowMetricsAppRunning(FlowRunRowKey flowRunRowKey, + String appId, TimelineEntity te) throws IOException { + Set<TimelineMetric> metrics = te.getMetrics(); + if (metrics != null) { + byte[] rowKey = flowRunRowKey.getRowKey(); + storeFlowMetrics(rowKey, metrics, + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId), + AggregationOperation.SUM.getAttribute()); + } + } + + private void storeFlowMetrics(byte[] rowKey, Set<TimelineMetric> metrics, + Attribute... attributes) throws IOException { + for (TimelineMetric metric : metrics) { + byte[] metricColumnQualifier = stringKeyConverter.encode(metric.getId()); + Map<Long, Number> timeseries = metric.getValues(); + for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) { + Long timestamp = timeseriesEntry.getKey(); + FlowRunColumnPrefix.METRIC.store(rowKey, flowRunTable, + metricColumnQualifier, timestamp, timeseriesEntry.getValue(), + attributes); + } + } + } + + private void storeRelations(byte[] rowKey, TimelineEntity te, + boolean isApplication) throws IOException { + if (isApplication) { + storeRelations(rowKey, te.getIsRelatedToEntities(), + ApplicationColumnPrefix.IS_RELATED_TO, applicationTable); + storeRelations(rowKey, te.getRelatesToEntities(), + ApplicationColumnPrefix.RELATES_TO, applicationTable); + } else { + storeRelations(rowKey, te.getIsRelatedToEntities(), + EntityColumnPrefix.IS_RELATED_TO, entityTable); + storeRelations(rowKey, te.getRelatesToEntities(), + EntityColumnPrefix.RELATES_TO, entityTable); + } + } + + /** + * Stores the Relations from the {@linkplain TimelineEntity} object. + */ + private <T> void storeRelations(byte[] rowKey, + Map<String, Set<String>> connectedEntities, + ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table) + throws IOException { + for (Map.Entry<String, Set<String>> connectedEntity : connectedEntities + .entrySet()) { + // id3?id4?id5 + String compoundValue = + Separator.VALUES.joinEncoded(connectedEntity.getValue()); + columnPrefix.store(rowKey, table, + stringKeyConverter.encode(connectedEntity.getKey()), null, + compoundValue); + } + } + + /** + * Stores information from the {@linkplain TimelineEntity} object. + */ + private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion, + boolean isApplication) throws IOException { + + if (isApplication) { + ApplicationColumn.ID.store(rowKey, applicationTable, null, te.getId()); + ApplicationColumn.CREATED_TIME.store(rowKey, applicationTable, null, + te.getCreatedTime()); + ApplicationColumn.FLOW_VERSION.store(rowKey, applicationTable, null, + flowVersion); + Map<String, Object> info = te.getInfo(); + if (info != null) { + for (Map.Entry<String, Object> entry : info.entrySet()) { + ApplicationColumnPrefix.INFO.store(rowKey, applicationTable, + stringKeyConverter.encode(entry.getKey()), null, + entry.getValue()); + } + } + } else { + EntityColumn.ID.store(rowKey, entityTable, null, te.getId()); + EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType()); + EntityColumn.CREATED_TIME.store(rowKey, entityTable, null, + te.getCreatedTime()); + EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion); + Map<String, Object> info = te.getInfo(); + if (info != null) { + for (Map.Entry<String, Object> entry : info.entrySet()) { + EntityColumnPrefix.INFO.store(rowKey, entityTable, + stringKeyConverter.encode(entry.getKey()), null, + entry.getValue()); + } + } + } + } + + /** + * stores the config information from {@linkplain TimelineEntity}. + */ + private void storeConfig(byte[] rowKey, Map<String, String> config, + boolean isApplication) throws IOException { + if (config == null) { + return; + } + for (Map.Entry<String, String> entry : config.entrySet()) { + byte[] configKey = stringKeyConverter.encode(entry.getKey()); + if (isApplication) { + ApplicationColumnPrefix.CONFIG.store(rowKey, applicationTable, + configKey, null, entry.getValue()); + } else { + EntityColumnPrefix.CONFIG.store(rowKey, entityTable, configKey, + null, entry.getValue()); + } + } + } + + /** + * stores the {@linkplain TimelineMetric} information from the + * {@linkplain TimelineEvent} object. + */ + private void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics, + boolean isApplication) throws IOException { + if (metrics != null) { + for (TimelineMetric metric : metrics) { + byte[] metricColumnQualifier = + stringKeyConverter.encode(metric.getId()); + Map<Long, Number> timeseries = metric.getValues(); + for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) { + Long timestamp = timeseriesEntry.getKey(); + if (isApplication) { + ApplicationColumnPrefix.METRIC.store(rowKey, applicationTable, + metricColumnQualifier, timestamp, timeseriesEntry.getValue()); + } else { + EntityColumnPrefix.METRIC.store(rowKey, entityTable, + metricColumnQualifier, timestamp, timeseriesEntry.getValue()); + } + } + } + } + } + + /** + * Stores the events from the {@linkplain TimelineEvent} object. + */ + private void storeEvents(byte[] rowKey, Set<TimelineEvent> events, + boolean isApplication) throws IOException { + if (events != null) { + for (TimelineEvent event : events) { + if (event != null) { + String eventId = event.getId(); + if (eventId != null) { + long eventTimestamp = event.getTimestamp(); + // if the timestamp is not set, use the current timestamp + if (eventTimestamp == TimelineEvent.INVALID_TIMESTAMP) { + LOG.warn("timestamp is not set for event " + eventId + + "! Using the current timestamp"); + eventTimestamp = System.currentTimeMillis(); + } + Map<String, Object> eventInfo = event.getInfo(); + if ((eventInfo == null) || (eventInfo.size() == 0)) { + byte[] columnQualifierBytes = + new EventColumnName(eventId, eventTimestamp, null) + .getColumnQualifier(); + if (isApplication) { + ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable, + columnQualifierBytes, null, Separator.EMPTY_BYTES); + } else { + EntityColumnPrefix.EVENT.store(rowKey, entityTable, + columnQualifierBytes, null, Separator.EMPTY_BYTES); + } + } else { + for (Map.Entry<String, Object> info : eventInfo.entrySet()) { + // eventId=infoKey + byte[] columnQualifierBytes = + new EventColumnName(eventId, eventTimestamp, info.getKey()) + .getColumnQualifier(); + if (isApplication) { + ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable, + columnQualifierBytes, null, info.getValue()); + } else { + EntityColumnPrefix.EVENT.store(rowKey, entityTable, + columnQualifierBytes, null, info.getValue()); + } + } // for info: eventInfo + } + } + } + } // event : events + } + } + + /** + * Checks if the input TimelineEntity object is an ApplicationEntity. + * + * @param te TimelineEntity object. + * @return true if input is an ApplicationEntity, false otherwise + */ + static boolean isApplicationEntity(TimelineEntity te) { + return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString()); + } + + /** + * @param te TimelineEntity object. + * @param eventId event with this id needs to be fetched + * @return TimelineEvent if TimelineEntity contains the desired event. + */ + private static TimelineEvent getApplicationEvent(TimelineEntity te, + String eventId) { + if (isApplicationEntity(te)) { + for (TimelineEvent event : te.getEvents()) { + if (event.getId().equals(eventId)) { + return event; + } + } + } + return null; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage + * .TimelineWriter#aggregate + * (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity, + * org.apache + * .hadoop.yarn.server.timelineservice.storage.TimelineAggregationTrack) + */ + @Override + public TimelineWriteResponse aggregate(TimelineEntity data, + TimelineAggregationTrack track) throws IOException { + return null; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter#flush + * () + */ + @Override + public void flush() throws IOException { + // flush all buffered mutators + entityTable.flush(); + appToFlowTable.flush(); + applicationTable.flush(); + flowRunTable.flush(); + flowActivityTable.flush(); + } + + /** + * close the hbase connections The close APIs perform flushing and release any + * resources held. + */ + @Override + protected void serviceStop() throws Exception { + if (entityTable != null) { + LOG.info("closing the entity table"); + // The close API performs flushing and releases any resources held + entityTable.close(); + } + if (appToFlowTable != null) { + LOG.info("closing the app_flow table"); + // The close API performs flushing and releases any resources held + appToFlowTable.close(); + } + if (applicationTable != null) { + LOG.info("closing the application table"); + applicationTable.close(); + } + if (flowRunTable != null) { + LOG.info("closing the flow run table"); + // The close API performs flushing and releases any resources held + flowRunTable.close(); + } + if (flowActivityTable != null) { + LOG.info("closing the flowActivityTable table"); + // The close API performs flushing and releases any resources held + flowActivityTable.close(); + } + if (conn != null) { + LOG.info("closing the hbase Connection"); + conn.close(); + } + super.serviceStop(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java new file mode 100644 index 0000000..1484f22 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo; + +import java.io.IOException; + +/** + * YARN timeline service v2 offline aggregation storage interface. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public abstract class OfflineAggregationWriter extends AbstractService { + + /** + * Construct the offline writer. + * + * @param name service name + */ + public OfflineAggregationWriter(String name) { + super(name); + } + + /** + * Persist aggregated timeline entities to the offline store based on which + * track this entity is to be rolled up to. The tracks along which + * aggregations are to be done are given by {@link OfflineAggregationInfo}. + * + * @param context a {@link TimelineCollectorContext} object that describes the + * context information of the aggregated data. Depends on the + * type of the aggregation, some fields of this context maybe + * empty or null. + * @param entities {@link TimelineEntities} to be persisted. + * @param info an {@link OfflineAggregationInfo} object that describes the + * detail of the aggregation. Current supported option is + * {@link OfflineAggregationInfo#FLOW_AGGREGATION}. + * @return a {@link TimelineWriteResponse} object. + * @throws IOException if any problem occurs while writing aggregated + * entities. + */ + abstract TimelineWriteResponse writeAggregatedEntity( + TimelineCollectorContext context, TimelineEntities entities, + OfflineAggregationInfo info) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java new file mode 100644 index 0000000..130cb6c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.apache.phoenix.util.PropertiesUtil; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +/** + * Offline aggregation Phoenix storage. This storage currently consists of two + * aggregation tables, one for flow level aggregation and one for user level + * aggregation. + * + * Example table record: + * + * <pre> + * |---------------------------| + * | Primary | Column Family| + * | key | metrics | + * |---------------------------| + * | row_key | metricId1: | + * | | metricValue1 | + * | | @timestamp1 | + * | | | + * | | metriciD1: | + * | | metricValue2 | + * | | @timestamp2 | + * | | | + * | | metricId2: | + * | | metricValue1 | + * | | @timestamp2 | + * | | | + * | | | + * | | | + * | | | + * | | | + * | | | + * | | | + * | | | + * | | | + * | | | + * | | | + * | | | + * | | | + * | | | + * |---------------------------| + * </pre> + * + * For the flow aggregation table, the primary key contains user, cluster, and + * flow id. For user aggregation table,the primary key is user. + * + * Metrics column family stores all aggregated metrics for each record. + */ +@Private +@Unstable +public class PhoenixOfflineAggregationWriterImpl + extends OfflineAggregationWriter { + + private static final Log LOG + = LogFactory.getLog(PhoenixOfflineAggregationWriterImpl.class); + private static final String PHOENIX_COL_FAMILY_PLACE_HOLDER + = "timeline_cf_placeholder"; + + /** Default Phoenix JDBC driver name. */ + private static final String DRIVER_CLASS_NAME + = "org.apache.phoenix.jdbc.PhoenixDriver"; + + /** Default Phoenix timeline config column family. */ + private static final String METRIC_COLUMN_FAMILY = "m."; + /** Default Phoenix timeline info column family. */ + private static final String INFO_COLUMN_FAMILY = "i."; + /** Default separator for Phoenix storage. */ + private static final String AGGREGATION_STORAGE_SEPARATOR = ";"; + + /** Connection string to the deployed Phoenix cluster. */ + private String connString = null; + private Properties connProperties = new Properties(); + + public PhoenixOfflineAggregationWriterImpl(Properties prop) { + super(PhoenixOfflineAggregationWriterImpl.class.getName()); + connProperties = PropertiesUtil.deepCopy(prop); + } + + public PhoenixOfflineAggregationWriterImpl() { + super(PhoenixOfflineAggregationWriterImpl.class.getName()); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + Class.forName(DRIVER_CLASS_NAME); + // so check it here and only read in the config if it's not overridden. + connString = + conf.get(YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR, + YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR_DEFAULT); + super.init(conf); + } + + @Override + public TimelineWriteResponse writeAggregatedEntity( + TimelineCollectorContext context, TimelineEntities entities, + OfflineAggregationInfo info) throws IOException { + TimelineWriteResponse response = new TimelineWriteResponse(); + String sql = "UPSERT INTO " + info.getTableName() + + " (" + StringUtils.join(info.getPrimaryKeyList(), ",") + + ", created_time, metric_names) " + + "VALUES (" + + StringUtils.repeat("?,", info.getPrimaryKeyList().length) + + "?, ?)"; + if (LOG.isDebugEnabled()) { + LOG.debug("TimelineEntity write SQL: " + sql); + } + + try (Connection conn = getConnection(); + PreparedStatement ps = conn.prepareStatement(sql)) { + for (TimelineEntity entity : entities.getEntities()) { + HashMap<String, TimelineMetric> formattedMetrics = new HashMap<>(); + if (entity.getMetrics() != null) { + for (TimelineMetric m : entity.getMetrics()) { + formattedMetrics.put(m.getId(), m); + } + } + int idx = info.setStringsForPrimaryKey(ps, context, null, 1); + ps.setLong(idx++, entity.getCreatedTime()); + ps.setString(idx++, + StringUtils.join(formattedMetrics.keySet().toArray(), + AGGREGATION_STORAGE_SEPARATOR)); + ps.execute(); + + storeEntityVariableLengthFields(entity, formattedMetrics, context, conn, + info); + + conn.commit(); + } + } catch (SQLException se) { + LOG.error("Failed to add entity to Phoenix " + se.getMessage()); + throw new IOException(se); + } catch (Exception e) { + LOG.error("Exception on getting connection: " + e.getMessage()); + throw new IOException(e); + } + return response; + } + + /** + * Create Phoenix tables for offline aggregation storage if the tables do not + * exist. + * + * @throws IOException if any problem happens while creating Phoenix tables. + */ + public void createPhoenixTables() throws IOException { + // Create tables if necessary + try (Connection conn = getConnection(); + Statement stmt = conn.createStatement()) { + // Table schema defined as in YARN-3817. + String sql = "CREATE TABLE IF NOT EXISTS " + + OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME + + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, " + + "flow_name VARCHAR NOT NULL, " + + "created_time UNSIGNED_LONG, " + + METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + + " VARBINARY, " + + "metric_names VARCHAR, info_keys VARCHAR " + + "CONSTRAINT pk PRIMARY KEY(" + + "user, cluster, flow_name))"; + stmt.executeUpdate(sql); + sql = "CREATE TABLE IF NOT EXISTS " + + OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME + + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, " + + "created_time UNSIGNED_LONG, " + + METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + + " VARBINARY, " + + "metric_names VARCHAR, info_keys VARCHAR " + + "CONSTRAINT pk PRIMARY KEY(user, cluster))"; + stmt.executeUpdate(sql); + conn.commit(); + } catch (SQLException se) { + LOG.error("Failed in init data " + se.getLocalizedMessage()); + throw new IOException(se); + } + return; + } + + // Utility functions + @Private + @VisibleForTesting + Connection getConnection() throws IOException { + Connection conn; + try { + conn = DriverManager.getConnection(connString, connProperties); + conn.setAutoCommit(false); + } catch (SQLException se) { + LOG.error("Failed to connect to phoenix server! " + + se.getLocalizedMessage()); + throw new IOException(se); + } + return conn; + } + + // WARNING: This method will permanently drop a table! + @Private + @VisibleForTesting + void dropTable(String tableName) throws Exception { + try (Connection conn = getConnection(); + Statement stmt = conn.createStatement()) { + String sql = "DROP TABLE " + tableName; + stmt.executeUpdate(sql); + } catch (SQLException se) { + LOG.error("Failed in dropping entity table " + se.getLocalizedMessage()); + throw se; + } + } + + private static class DynamicColumns<K> { + static final String COLUMN_FAMILY_TYPE_BYTES = " VARBINARY"; + static final String COLUMN_FAMILY_TYPE_STRING = " VARCHAR"; + private String columnFamilyPrefix; + private String type; + private Set<K> columns; + + public DynamicColumns(String columnFamilyPrefix, String type, + Set<K> keyValues) { + this.columnFamilyPrefix = columnFamilyPrefix; + this.columns = keyValues; + this.type = type; + } + } + + private static <K> StringBuilder appendColumnsSQL( + StringBuilder colNames, DynamicColumns<K> cfInfo) { + // Prepare the sql template by iterating through all keys + for (K key : cfInfo.columns) { + colNames.append(",").append(cfInfo.columnFamilyPrefix) + .append(key.toString()).append(cfInfo.type); + } + return colNames; + } + + private static <K, V> int setValuesForColumnFamily( + PreparedStatement ps, Map<K, V> keyValues, int startPos, + boolean converToBytes) throws SQLException { + int idx = startPos; + for (Map.Entry<K, V> entry : keyValues.entrySet()) { + V value = entry.getValue(); + if (value instanceof Collection) { + ps.setString(idx++, StringUtils.join( + (Collection) value, AGGREGATION_STORAGE_SEPARATOR)); + } else { + if (converToBytes) { + try { + ps.setBytes(idx++, GenericObjectMapper.write(entry.getValue())); + } catch (IOException ie) { + LOG.error("Exception in converting values into bytes " + + ie.getMessage()); + throw new SQLException(ie); + } + } else { + ps.setString(idx++, value.toString()); + } + } + } + return idx; + } + + private static <K, V> int setBytesForColumnFamily( + PreparedStatement ps, Map<K, V> keyValues, int startPos) + throws SQLException { + return setValuesForColumnFamily(ps, keyValues, startPos, true); + } + + private static <K, V> int setStringsForColumnFamily( + PreparedStatement ps, Map<K, V> keyValues, int startPos) + throws SQLException { + return setValuesForColumnFamily(ps, keyValues, startPos, false); + } + + private static void storeEntityVariableLengthFields(TimelineEntity entity, + Map<String, TimelineMetric> formattedMetrics, + TimelineCollectorContext context, Connection conn, + OfflineAggregationInfo aggregationInfo) throws SQLException { + int numPlaceholders = 0; + StringBuilder columnDefs = new StringBuilder( + StringUtils.join(aggregationInfo.getPrimaryKeyList(), ",")); + if (formattedMetrics != null && formattedMetrics.size() > 0) { + appendColumnsSQL(columnDefs, new DynamicColumns<>( + METRIC_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_BYTES, + formattedMetrics.keySet())); + numPlaceholders += formattedMetrics.keySet().size(); + } + if (numPlaceholders == 0) { + return; + } + StringBuilder placeholders = new StringBuilder(); + placeholders.append( + StringUtils.repeat("?,", aggregationInfo.getPrimaryKeyList().length)); + // numPlaceholders >= 1 now + placeholders.append("?") + .append(StringUtils.repeat(",?", numPlaceholders - 1)); + String sqlVariableLengthFields = new StringBuilder("UPSERT INTO ") + .append(aggregationInfo.getTableName()).append(" (").append(columnDefs) + .append(") VALUES(").append(placeholders).append(")").toString(); + if (LOG.isDebugEnabled()) { + LOG.debug("SQL statement for variable length fields: " + + sqlVariableLengthFields); + } + // Use try with resource statement for the prepared statement + try (PreparedStatement psVariableLengthFields = + conn.prepareStatement(sqlVariableLengthFields)) { + int idx = aggregationInfo.setStringsForPrimaryKey( + psVariableLengthFields, context, null, 1); + if (formattedMetrics != null && formattedMetrics.size() > 0) { + idx = setBytesForColumnFamily( + psVariableLengthFields, formattedMetrics, idx); + } + psVariableLengthFields.execute(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java new file mode 100644 index 0000000..6a1e086 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +/** + * specifies the tracks along which an entity + * info is to be aggregated on. + * + */ +public enum TimelineAggregationTrack { + APP, FLOW, USER, QUEUE +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java new file mode 100644 index 0000000..e8eabf1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; + +import java.util.Set; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; + +/** ATSv2 reader interface. */ +@Private +@Unstable +public interface TimelineReader extends Service { + + /** + * Possible fields to retrieve for {@link #getEntities} and + * {@link #getEntity}. + */ + public enum Field { + ALL, + EVENTS, + INFO, + METRICS, + CONFIGS, + RELATES_TO, + IS_RELATED_TO + } + + /** + * <p>The API to fetch the single entity given the identifier(depending on + * the entity type) in the scope of the given context.</p> + * @param context Context which defines the scope in which query has to be + * made. Use getters of {@link TimelineReaderContext} to fetch context + * fields. Context contains the following :<br> + * <ul> + * <li><b>entityType</b> - Entity type(mandatory).</li> + * <li><b>clusterId</b> - Identifies the cluster(mandatory).</li> + * <li><b>userId</b> - Identifies the user.</li> + * <li><b>flowName</b> - Context flow name.</li> + * <li><b>flowRunId</b> - Context flow run id.</li> + * <li><b>appId</b> - Context app id.</li> + * <li><b>entityId</b> - Entity id.</li> + * </ul> + * Fields in context which are mandatory depends on entity type. Entity + * type is always mandatory. In addition to entity type, below is the list + * of context fields which are mandatory, based on entity type.<br> + * <ul> + * <li>If entity type is YARN_FLOW_RUN (i.e. query to fetch a specific flow + * run), clusterId, userId, flowName and flowRunId are mandatory.</li> + * <li>If entity type is YARN_APPLICATION (i.e. query to fetch a specific + * app), query is within the scope of clusterId, userId, flowName, + * flowRunId and appId. But out of this, only clusterId and appId are + * mandatory. If only clusterId and appId are supplied, backend storage + * must fetch the flow context information i.e. userId, flowName and + * flowRunId first and based on that, fetch the app. If flow context + * information is also given, app can be directly fetched. + * </li> + * <li>For other entity types (i.e. query to fetch generic entity), query + * is within the scope of clusterId, userId, flowName, flowRunId, appId, + * entityType and entityId. But out of this, only clusterId, appId, + * entityType and entityId are mandatory. If flow context information is + * not supplied, backend storage must fetch the flow context information + * i.e. userId, flowName and flowRunId first and based on that, fetch the + * entity. If flow context information is also given, entity can be + * directly queried. + * </li> + * </ul> + * @param dataToRetrieve Specifies which data to retrieve for the entity. Use + * getters of TimelineDataToRetrieve class to fetch dataToRetrieve + * fields. All the dataToRetrieve fields are optional. Refer to + * {@link TimelineDataToRetrieve} for details. + * @return A <cite>TimelineEntity</cite> instance or null. The entity will + * contain the metadata plus the given fields to retrieve.<br> + * If entityType is YARN_FLOW_RUN, entity returned is of type + * <cite>FlowRunEntity</cite>.<br> + * For all other entity types, entity returned is of type + * <cite>TimelineEntity</cite>. + * @throws IOException if there is an exception encountered while fetching + * entity from backend storage. + */ + TimelineEntity getEntity(TimelineReaderContext context, + TimelineDataToRetrieve dataToRetrieve) throws IOException; + + /** + * <p>The API to search for a set of entities of the given entity type in + * the scope of the given context which matches the given predicates. The + * predicates include the created time window, limit to number of entities to + * be returned, and the entities can be filtered by checking whether they + * contain the given info/configs entries in the form of key/value pairs, + * given metrics in the form of metricsIds and its relation with metric + * values, given events in the form of the Ids, and whether they relate to/are + * related to other entities. For those parameters which have multiple + * entries, the qualified entity needs to meet all or them.</p> + * + * @param context Context which defines the scope in which query has to be + * made. Use getters of {@link TimelineReaderContext} to fetch context + * fields. Context contains the following :<br> + * <ul> + * <li><b>entityType</b> - Entity type(mandatory).</li> + * <li><b>clusterId</b> - Identifies the cluster(mandatory).</li> + * <li><b>userId</b> - Identifies the user.</li> + * <li><b>flowName</b> - Context flow name.</li> + * <li><b>flowRunId</b> - Context flow run id.</li> + * <li><b>appId</b> - Context app id.</li> + * </ul> + * Although entityId is also part of context, it has no meaning for + * getEntities.<br> + * Fields in context which are mandatory depends on entity type. Entity + * type is always mandatory. In addition to entity type, below is the list + * of context fields which are mandatory, based on entity type.<br> + * <ul> + * <li>If entity type is YARN_FLOW_ACTIVITY (i.e. query to fetch flows), + * only clusterId is mandatory. + * </li> + * <li>If entity type is YARN_FLOW_RUN (i.e. query to fetch flow runs), + * clusterId, userId and flowName are mandatory.</li> + * <li>If entity type is YARN_APPLICATION (i.e. query to fetch apps), we + * can either get all apps within the context of flow name or within the + * context of flow run. If apps are queried within the scope of flow name, + * clusterId, userId and flowName are supplied. If they are queried within + * the scope of flow run, clusterId, userId, flowName and flowRunId are + * supplied.</li> + * <li>For other entity types (i.e. query to fetch generic entities), query + * is within the scope of clusterId, userId, flowName, flowRunId, appId and + * entityType. But out of this, only clusterId, appId and entityType are + * mandatory. If flow context information is not supplied, backend storage + * must fetch the flow context information i.e. userId, flowName and + * flowRunId first and based on that, fetch the entities. If flow context + * information is also given, entities can be directly queried. + * </li> + * </ul> + * @param filters Specifies filters which restrict the number of entities + * to return. Use getters of TimelineEntityFilters class to fetch + * various filters. All the filters are optional. Refer to + * {@link TimelineEntityFilters} for details. + * @param dataToRetrieve Specifies which data to retrieve for each entity. Use + * getters of TimelineDataToRetrieve class to fetch dataToRetrieve + * fields. All the dataToRetrieve fields are optional. Refer to + * {@link TimelineDataToRetrieve} for details. + * @return A set of <cite>TimelineEntity</cite> instances of the given entity + * type in the given context scope which matches the given predicates + * ordered by created time, descending. Each entity will only contain the + * metadata(id, type and created time) plus the given fields to retrieve. + * <br> + * If entityType is YARN_FLOW_ACTIVITY, entities returned are of type + * <cite>FlowActivityEntity</cite>.<br> + * If entityType is YARN_FLOW_RUN, entities returned are of type + * <cite>FlowRunEntity</cite>.<br> + * For all other entity types, entities returned are of type + * <cite>TimelineEntity</cite>. + * @throws IOException if there is an exception encountered while fetching + * entity from backend storage. + */ + Set<TimelineEntity> getEntities( + TimelineReaderContext context, + TimelineEntityFilters filters, + TimelineDataToRetrieve dataToRetrieve) throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java new file mode 100644 index 0000000..33f5449 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This creates the schema for a hbase based backend for storing application + * timeline information. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public final class TimelineSchemaCreator { + private TimelineSchemaCreator() { + } + + final static String NAME = TimelineSchemaCreator.class.getSimpleName(); + private static final Log LOG = LogFactory.getLog(TimelineSchemaCreator.class); + private static final String PHOENIX_OPTION_SHORT = "p"; + private static final String SKIP_EXISTING_TABLE_OPTION_SHORT = "s"; + private static final String APP_TABLE_NAME_SHORT = "a"; + private static final String APP_TO_FLOW_TABLE_NAME_SHORT = "a2f"; + private static final String TTL_OPTION_SHORT = "m"; + private static final String ENTITY_TABLE_NAME_SHORT = "e"; + + public static void main(String[] args) throws Exception { + + Configuration hbaseConf = HBaseConfiguration.create(); + // Grab input args and allow for -Dxyz style arguments + String[] otherArgs = new GenericOptionsParser(hbaseConf, args) + .getRemainingArgs(); + + // Grab the arguments we're looking for. + CommandLine commandLine = parseArgs(otherArgs); + + // Grab the entityTableName argument + String entityTableName + = commandLine.getOptionValue(ENTITY_TABLE_NAME_SHORT); + if (StringUtils.isNotBlank(entityTableName)) { + hbaseConf.set(EntityTable.TABLE_NAME_CONF_NAME, entityTableName); + } + String entityTableTTLMetrics = commandLine.getOptionValue(TTL_OPTION_SHORT); + if (StringUtils.isNotBlank(entityTableTTLMetrics)) { + int metricsTTL = Integer.parseInt(entityTableTTLMetrics); + new EntityTable().setMetricsTTL(metricsTTL, hbaseConf); + } + // Grab the appToflowTableName argument + String appToflowTableName = commandLine.getOptionValue( + APP_TO_FLOW_TABLE_NAME_SHORT); + if (StringUtils.isNotBlank(appToflowTableName)) { + hbaseConf.set(AppToFlowTable.TABLE_NAME_CONF_NAME, appToflowTableName); + } + // Grab the applicationTableName argument + String applicationTableName = commandLine.getOptionValue( + APP_TABLE_NAME_SHORT); + if (StringUtils.isNotBlank(applicationTableName)) { + hbaseConf.set(ApplicationTable.TABLE_NAME_CONF_NAME, + applicationTableName); + } + + List<Exception> exceptions = new ArrayList<>(); + try { + boolean skipExisting + = commandLine.hasOption(SKIP_EXISTING_TABLE_OPTION_SHORT); + if (skipExisting) { + LOG.info("Will skip existing tables and continue on htable creation " + + "exceptions!"); + } + createAllTables(hbaseConf, skipExisting); + LOG.info("Successfully created HBase schema. "); + } catch (IOException e) { + LOG.error("Error in creating hbase tables: " + e.getMessage()); + exceptions.add(e); + } + + // Create Phoenix data schema if needed + if (commandLine.hasOption(PHOENIX_OPTION_SHORT)) { + Configuration phoenixConf = new Configuration(); + try { + PhoenixOfflineAggregationWriterImpl phoenixWriter = + new PhoenixOfflineAggregationWriterImpl(); + phoenixWriter.init(phoenixConf); + phoenixWriter.start(); + phoenixWriter.createPhoenixTables(); + phoenixWriter.stop(); + LOG.info("Successfully created Phoenix offline aggregation schema. "); + } catch (IOException e) { + LOG.error("Error in creating phoenix tables: " + e.getMessage()); + exceptions.add(e); + } + } + if (exceptions.size() > 0) { + LOG.warn("Schema creation finished with the following exceptions"); + for (Exception e : exceptions) { + LOG.warn(e.getMessage()); + } + System.exit(-1); + } else { + LOG.info("Schema creation finished successfully"); + } + } + + /** + * Parse command-line arguments. + * + * @param args + * command line arguments passed to program. + * @return parsed command line. + * @throws ParseException + */ + private static CommandLine parseArgs(String[] args) throws ParseException { + Options options = new Options(); + + // Input + Option o = new Option(ENTITY_TABLE_NAME_SHORT, "entityTableName", true, + "entity table name"); + o.setArgName("entityTableName"); + o.setRequired(false); + options.addOption(o); + + o = new Option(TTL_OPTION_SHORT, "metricsTTL", true, + "TTL for metrics column family"); + o.setArgName("metricsTTL"); + o.setRequired(false); + options.addOption(o); + + o = new Option(APP_TO_FLOW_TABLE_NAME_SHORT, "appToflowTableName", true, + "app to flow table name"); + o.setArgName("appToflowTableName"); + o.setRequired(false); + options.addOption(o); + + o = new Option(APP_TABLE_NAME_SHORT, "applicationTableName", true, + "application table name"); + o.setArgName("applicationTableName"); + o.setRequired(false); + options.addOption(o); + + // Options without an argument + // No need to set arg name since we do not need an argument here + o = new Option(PHOENIX_OPTION_SHORT, "usePhoenix", false, + "create Phoenix offline aggregation tables"); + o.setRequired(false); + options.addOption(o); + + o = new Option(SKIP_EXISTING_TABLE_OPTION_SHORT, "skipExistingTable", + false, "skip existing Hbase tables and continue to create new tables"); + o.setRequired(false); + options.addOption(o); + + CommandLineParser parser = new PosixParser(); + CommandLine commandLine = null; + try { + commandLine = parser.parse(options, args); + } catch (Exception e) { + LOG.error("ERROR: " + e.getMessage() + "\n"); + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(NAME + " ", options, true); + System.exit(-1); + } + + return commandLine; + } + + @VisibleForTesting + public static void createAllTables(Configuration hbaseConf, + boolean skipExisting) throws IOException { + + Connection conn = null; + try { + conn = ConnectionFactory.createConnection(hbaseConf); + Admin admin = conn.getAdmin(); + if (admin == null) { + throw new IOException("Cannot create table since admin is null"); + } + try { + new EntityTable().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } + try { + new AppToFlowTable().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } + try { + new ApplicationTable().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } + try { + new FlowRunTable().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } + try { + new FlowActivityTable().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } + } finally { + if (conn != null) { + conn.close(); + } + } + } + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java new file mode 100644 index 0000000..663a18a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; + +/** + * This interface is for storing application timeline information. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface TimelineWriter extends Service { + + /** + * Stores the entire information in {@link TimelineEntities} to the + * timeline store. Any errors occurring for individual write request objects + * will be reported in the response. + * + * @param clusterId context cluster ID + * @param userId context user ID + * @param flowName context flow name + * @param flowVersion context flow version + * @param flowRunId run id for the flow. + * @param appId context app ID. + * @param data + * a {@link TimelineEntities} object. + * @return a {@link TimelineWriteResponse} object. + * @throws IOException if there is any exception encountered while storing + * or writing entities to the backend storage. + */ + TimelineWriteResponse write(String clusterId, String userId, + String flowName, String flowVersion, long flowRunId, String appId, + TimelineEntities data) throws IOException; + + /** + * Aggregates the entity information to the timeline store based on which + * track this entity is to be rolled up to The tracks along which aggregations + * are to be done are given by {@link TimelineAggregationTrack} + * + * Any errors occurring for individual write request objects will be reported + * in the response. + * + * @param data + * a {@link TimelineEntity} object + * a {@link TimelineAggregationTrack} enum + * value. + * @param track Specifies the track or dimension along which aggregation would + * occur. Includes USER, FLOW, QUEUE, etc. + * @return a {@link TimelineWriteResponse} object. + * @throws IOException if there is any exception encountered while aggregating + * entities to the backend storage. + */ + TimelineWriteResponse aggregate(TimelineEntity data, + TimelineAggregationTrack track) throws IOException; + + /** + * Flushes the data to the backend storage. Whatever may be buffered will be + * written to the storage when the method returns. This may be a potentially + * time-consuming operation, and should be used judiciously. + * + * @throws IOException if there is any exception encountered while flushing + * entities to the backend storage. + */ + void flush() throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java new file mode 100644 index 0000000..dde3911 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.application; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; + +/** + * Identifies fully qualified columns for the {@link ApplicationTable}. + */ +public enum ApplicationColumn implements Column<ApplicationTable> { + + /** + * App id. + */ + ID(ApplicationColumnFamily.INFO, "id"), + + /** + * When the application was created. + */ + CREATED_TIME(ApplicationColumnFamily.INFO, "created_time", + new LongConverter()), + + /** + * The version of the flow that this app belongs to. + */ + FLOW_VERSION(ApplicationColumnFamily.INFO, "flow_version"); + + private final ColumnHelper<ApplicationTable> column; + private final ColumnFamily<ApplicationTable> columnFamily; + private final String columnQualifier; + private final byte[] columnQualifierBytes; + + private ApplicationColumn(ColumnFamily<ApplicationTable> columnFamily, + String columnQualifier) { + this(columnFamily, columnQualifier, GenericConverter.getInstance()); + } + + private ApplicationColumn(ColumnFamily<ApplicationTable> columnFamily, + String columnQualifier, ValueConverter converter) { + this.columnFamily = columnFamily; + this.columnQualifier = columnQualifier; + // Future-proof by ensuring the right column prefix hygiene. + this.columnQualifierBytes = + Bytes.toBytes(Separator.SPACE.encode(columnQualifier)); + this.column = new ColumnHelper<ApplicationTable>(columnFamily, converter); + } + + /** + * @return the column name value + */ + private String getColumnQualifier() { + return columnQualifier; + } + + public void store(byte[] rowKey, + TypedBufferedMutator<ApplicationTable> tableMutator, Long timestamp, + Object inputValue, Attribute... attributes) throws IOException { + column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, + inputValue, attributes); + } + + public Object readResult(Result result) throws IOException { + return column.readResult(result, columnQualifierBytes); + } + + @Override + public byte[] getColumnQualifierBytes() { + return columnQualifierBytes.clone(); + } + + @Override + public byte[] getColumnFamilyBytes() { + return columnFamily.getBytes(); + } + + @Override + public ValueConverter getValueConverter() { + return column.getValueConverter(); + } + + /** + * Retrieve an {@link ApplicationColumn} given a name, or null if there is no + * match. The following holds true: {@code columnFor(x) == columnFor(y)} if + * and only if {@code x.equals(y)} or {@code (x == y == null)}. + * + * @param columnQualifier Name of the column to retrieve + * @return the corresponding {@link ApplicationColumn} or null + */ + public static final ApplicationColumn columnFor(String columnQualifier) { + + // Match column based on value, assume column family matches. + for (ApplicationColumn ac : ApplicationColumn.values()) { + // Find a match based only on name. + if (ac.getColumnQualifier().equals(columnQualifier)) { + return ac; + } + } + + // Default to null + return null; + } + + /** + * Retrieve an {@link ApplicationColumn} given a name, or null if there is no + * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)} + * if and only if {@code a.equals(b) & x.equals(y)} or + * {@code (x == y == null)} + * + * @param columnFamily The columnFamily for which to retrieve the column. + * @param name Name of the column to retrieve + * @return the corresponding {@link ApplicationColumn} or null if both + * arguments don't match. + */ + public static final ApplicationColumn columnFor( + ApplicationColumnFamily columnFamily, String name) { + + for (ApplicationColumn ac : ApplicationColumn.values()) { + // Find a match based column family and on name. + if (ac.columnFamily.equals(columnFamily) + && ac.getColumnQualifier().equals(name)) { + return ac; + } + } + + // Default to null + return null; + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org