YARN-4074. [timeline reader] implement support for querying for flows and flow runs (sjlee via vrushali)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6f998f47 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6f998f47 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6f998f47 Branch: refs/heads/YARN-2928-rebase Commit: 6f998f47905d1c3603d26e39e67cfbc834281c47 Parents: e1f3032 Author: Vrushali <vrush...@apache.org> Authored: Tue Sep 22 13:42:30 2015 -0700 Committer: Sangjin Lee <sj...@apache.org> Committed: Mon Nov 9 16:13:14 2015 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../timelineservice/FlowActivityEntity.java | 183 ++++++++ .../api/records/timelineservice/FlowEntity.java | 103 ----- .../records/timelineservice/FlowRunEntity.java | 121 ++++++ .../timelineservice/TimelineEntityType.java | 31 +- .../TestTimelineServiceRecords.java | 14 +- .../TestTimelineServiceClientIntegration.java | 2 +- .../collector/TimelineCollectorWebService.java | 6 +- .../storage/ApplicationEntityReader.java | 229 ++++++++++ .../storage/FlowActivityEntityReader.java | 168 +++++++ .../storage/FlowRunEntityReader.java | 136 ++++++ .../storage/GenericEntityReader.java | 389 +++++++++++++++++ .../storage/HBaseTimelineReaderImpl.java | 434 +------------------ .../storage/TimelineEntityReader.java | 223 ++++++++++ .../storage/TimelineEntityReaderFactory.java | 97 +++++ .../storage/application/ApplicationRowKey.java | 68 ++- .../storage/apptoflow/AppToFlowRowKey.java | 31 ++ .../storage/common/BaseTable.java | 3 +- .../storage/entity/EntityRowKey.java | 76 +++- .../storage/flow/FlowActivityRowKey.java | 7 +- .../storage/flow/FlowRunRowKey.java | 50 ++- .../storage/flow/FlowScanner.java | 18 +- .../storage/TestHBaseTimelineStorage.java | 34 +- .../storage/flow/TestFlowDataGenerator.java | 39 +- .../flow/TestHBaseStorageFlowActivity.java | 131 ++++-- .../storage/flow/TestHBaseStorageFlowRun.java | 105 +++-- 26 files changed, 2021 insertions(+), 680 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 6f5c8d6..fc8a0e9 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -106,6 +106,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-3901. Populate flow run data in the flow_run & flow activity tables (Vrushali C via sjlee) + YARN-4074. [timeline reader] implement support for querying for flows + and flow runs (sjlee via vrushali) + IMPROVEMENTS YARN-3276. Code cleanup for timeline service API records. (Junping Du via http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowActivityEntity.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowActivityEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowActivityEntity.java new file mode 100644 index 0000000..163bd5c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowActivityEntity.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.timelineservice; + +import java.util.Collection; +import java.util.Date; +import java.util.NavigableSet; +import java.util.TreeSet; + +import javax.xml.bind.annotation.XmlElement; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Entity that represents a record for flow activity. It's essentially a + * container entity for flow runs with limited information. + */ +@Public +@Unstable +public class FlowActivityEntity extends TimelineEntity { + public static final String CLUSTER_INFO_KEY = + TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "CLUSTER"; + public static final String DATE_INFO_KEY = + TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "DATE"; + public static final String USER_INFO_KEY = + TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "USER"; + public static final String FLOW_NAME_INFO_KEY = + TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_NAME"; + + private final NavigableSet<FlowRunEntity> flowRuns = new TreeSet<>(); + + public FlowActivityEntity() { + super(TimelineEntityType.YARN_FLOW_ACTIVITY.toString()); + // set config to null + setConfigs(null); + } + + public FlowActivityEntity(String cluster, long time, String user, + String flowName) { + this(); + setCluster(cluster); + setDate(time); + setUser(user); + setFlowName(flowName); + } + + public FlowActivityEntity(TimelineEntity entity) { + super(entity); + if (!TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entity.getType())) { + throw new IllegalArgumentException("Incompatible entity type: " + + getId()); + } + // set config to null + setConfigs(null); + } + + @XmlElement(name = "id") + @Override + public String getId() { + // flow activity: cluster/day/user@flow_name + String id = super.getId(); + if (id == null) { + StringBuilder sb = new StringBuilder(); + sb.append(getCluster()); + sb.append('/'); + sb.append(getDate().getTime()); + sb.append('/'); + sb.append(getUser()); + sb.append('@'); + sb.append(getFlowName()); + id = sb.toString(); + setId(id); + } + return id; + } + + @Override + public int compareTo(TimelineEntity entity) { + int comparison = getType().compareTo(entity.getType()); + if (comparison == 0) { + // order by cluster, date (descending), user, and flow name + FlowActivityEntity other = (FlowActivityEntity)entity; + int clusterComparison = getCluster().compareTo(other.getCluster()); + if (clusterComparison != 0) { + return clusterComparison; + } + int dateComparisonDescending = + (int)(other.getDate().getTime() - getDate().getTime()); // descending + if (dateComparisonDescending != 0) { + return dateComparisonDescending; // descending + } + int userComparison = getUser().compareTo(other.getUser()); + if (userComparison != 0) { + return userComparison; + } + return getFlowName().compareTo(other.getFlowName()); + } else { + return comparison; + } + } + + /** + * Reuse the base class equals method. + */ + @Override + public boolean equals(Object obj) { + return super.equals(obj); + } + + /** + * Reuse the base class hashCode method. + */ + @Override + public int hashCode() { + return super.hashCode(); + } + + public String getCluster() { + return (String)getInfo().get(CLUSTER_INFO_KEY); + } + + public void setCluster(String cluster) { + addInfo(CLUSTER_INFO_KEY, cluster); + } + + public Date getDate() { + return (Date)getInfo().get(DATE_INFO_KEY); + } + + public void setDate(long time) { + Date date = new Date(time); + addInfo(DATE_INFO_KEY, date); + } + + public String getUser() { + return (String)getInfo().get(USER_INFO_KEY); + } + + public void setUser(String user) { + addInfo(USER_INFO_KEY, user); + } + + public String getFlowName() { + return (String)getInfo().get(FLOW_NAME_INFO_KEY); + } + + public void setFlowName(String flowName) { + addInfo(FLOW_NAME_INFO_KEY, flowName); + } + + public void addFlowRun(FlowRunEntity run) { + flowRuns.add(run); + } + + public void addFlowRuns(Collection<FlowRunEntity> runs) { + flowRuns.addAll(runs); + } + + @XmlElement(name = "flowruns") + public NavigableSet<FlowRunEntity> getFlowRuns() { + return flowRuns; + } + + public int getNumberOfRuns() { + return flowRuns.size(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java deleted file mode 100644 index 4554778..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.api.records.timelineservice; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -import javax.xml.bind.annotation.XmlElement; - -@InterfaceAudience.Public -@InterfaceStability.Unstable -public class FlowEntity extends HierarchicalTimelineEntity { - public static final String USER_INFO_KEY = - TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "USER"; - public static final String FLOW_NAME_INFO_KEY = - TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_NAME"; - public static final String FLOW_VERSION_INFO_KEY = - TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_VERSION"; - public static final String FLOW_RUN_ID_INFO_KEY = - TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_RUN_ID"; - - public FlowEntity() { - super(TimelineEntityType.YARN_FLOW.toString()); - } - - public FlowEntity(TimelineEntity entity) { - super(entity); - if (!entity.getType().equals(TimelineEntityType.YARN_FLOW.toString())) { - throw new IllegalArgumentException("Incompatible entity type: " + getId()); - } - } - - @XmlElement(name = "id") - @Override - public String getId() { - //Flow id schema: user@flow_name(or id)/version/run_id - String id = super.getId(); - if (id == null) { - StringBuilder sb = new StringBuilder(); - sb.append(getInfo().get(USER_INFO_KEY).toString()); - sb.append('@'); - sb.append(getInfo().get(FLOW_NAME_INFO_KEY).toString()); - sb.append('/'); - sb.append(getInfo().get(FLOW_VERSION_INFO_KEY).toString()); - sb.append('/'); - sb.append(getInfo().get(FLOW_RUN_ID_INFO_KEY).toString()); - id = sb.toString(); - setId(id); - } - return id; - } - - public String getUser() { - Object user = getInfo().get(USER_INFO_KEY); - return user == null ? null : user.toString(); - } - - public void setUser(String user) { - addInfo(USER_INFO_KEY, user); - } - - public String getName() { - Object name = getInfo().get(FLOW_NAME_INFO_KEY); - return name == null ? null : name.toString(); - } - - public void setName(String name) { - addInfo(FLOW_NAME_INFO_KEY, name); - } - - public String getVersion() { - Object version = getInfo().get(FLOW_VERSION_INFO_KEY); - return version == null ? null : version.toString(); - } - - public void setVersion(String version) { - addInfo(FLOW_VERSION_INFO_KEY, version); - } - - public long getRunId() { - Object runId = getInfo().get(FLOW_RUN_ID_INFO_KEY); - return runId == null ? 0L : (Long) runId; - } - - public void setRunId(long runId) { - addInfo(FLOW_RUN_ID_INFO_KEY, runId); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java new file mode 100644 index 0000000..3c3ffb4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.timelineservice; + +import javax.xml.bind.annotation.XmlElement; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class FlowRunEntity extends HierarchicalTimelineEntity { + public static final String USER_INFO_KEY = + TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "USER"; + public static final String FLOW_NAME_INFO_KEY = + TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_NAME"; + public static final String FLOW_VERSION_INFO_KEY = + TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_VERSION"; + public static final String FLOW_RUN_ID_INFO_KEY = + TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_RUN_ID"; + public static final String FLOW_RUN_END_TIME = + TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_RUN_END_TIME"; + + public FlowRunEntity() { + super(TimelineEntityType.YARN_FLOW_RUN.toString()); + // set config to null + setConfigs(null); + } + + public FlowRunEntity(TimelineEntity entity) { + super(entity); + if (!entity.getType().equals(TimelineEntityType.YARN_FLOW_RUN.toString())) { + throw new IllegalArgumentException("Incompatible entity type: " + getId()); + } + // set config to null + setConfigs(null); + } + + @XmlElement(name = "id") + @Override + public String getId() { + //Flow id schema: user@flow_name(or id)/run_id + String id = super.getId(); + if (id == null) { + StringBuilder sb = new StringBuilder(); + sb.append(getInfo().get(USER_INFO_KEY).toString()); + sb.append('@'); + sb.append(getInfo().get(FLOW_NAME_INFO_KEY).toString()); + sb.append('/'); + sb.append(getInfo().get(FLOW_RUN_ID_INFO_KEY).toString()); + id = sb.toString(); + setId(id); + } + return id; + } + + public String getUser() { + return (String)getInfo().get(USER_INFO_KEY); + } + + public void setUser(String user) { + addInfo(USER_INFO_KEY, user); + } + + public String getName() { + return (String)getInfo().get(FLOW_NAME_INFO_KEY); + } + + public void setName(String name) { + addInfo(FLOW_NAME_INFO_KEY, name); + } + + public String getVersion() { + return (String)getInfo().get(FLOW_VERSION_INFO_KEY); + } + + public void setVersion(String version) { + addInfo(FLOW_VERSION_INFO_KEY, version); + } + + public long getRunId() { + Object runId = getInfo().get(FLOW_RUN_ID_INFO_KEY); + return runId == null ? 0L : (Long) runId; + } + + public void setRunId(long runId) { + addInfo(FLOW_RUN_ID_INFO_KEY, runId); + } + + public long getStartTime() { + return getCreatedTime(); + } + + public void setStartTime(long startTime) { + setCreatedTime(startTime); + } + + public long getMaxEndTime() { + Object time = getInfo().get(FLOW_RUN_END_TIME); + return time == null ? 0L : (Long)time; + } + + public void setMaxEndTime(long endTime) { + addInfo(FLOW_RUN_END_TIME, endTime); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java index 6062fe1..ba32e20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java @@ -24,21 +24,25 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceStability.Unstable public enum TimelineEntityType { YARN_CLUSTER, - YARN_FLOW, + YARN_FLOW_RUN, YARN_APPLICATION, YARN_APPLICATION_ATTEMPT, YARN_CONTAINER, YARN_USER, - YARN_QUEUE; + YARN_QUEUE, + YARN_FLOW_ACTIVITY; + /** + * Whether the input type can be a parent of this entity. + */ public boolean isParent(TimelineEntityType type) { switch (this) { case YARN_CLUSTER: return false; - case YARN_FLOW: - return YARN_FLOW == type || YARN_CLUSTER == type; + case YARN_FLOW_RUN: + return YARN_FLOW_RUN == type || YARN_CLUSTER == type; case YARN_APPLICATION: - return YARN_FLOW == type || YARN_CLUSTER == type; + return YARN_FLOW_RUN == type || YARN_CLUSTER == type; case YARN_APPLICATION_ATTEMPT: return YARN_APPLICATION == type; case YARN_CONTAINER: @@ -50,12 +54,15 @@ public enum TimelineEntityType { } } + /** + * Whether the input type can be a child of this entity. + */ public boolean isChild(TimelineEntityType type) { switch (this) { case YARN_CLUSTER: - return YARN_FLOW == type || YARN_APPLICATION == type; - case YARN_FLOW: - return YARN_FLOW == type || YARN_APPLICATION == type; + return YARN_FLOW_RUN == type || YARN_APPLICATION == type; + case YARN_FLOW_RUN: + return YARN_FLOW_RUN == type || YARN_APPLICATION == type; case YARN_APPLICATION: return YARN_APPLICATION_ATTEMPT == type; case YARN_APPLICATION_ATTEMPT: @@ -68,4 +75,12 @@ public enum TimelineEntityType { return false; } } + + /** + * Whether the type of this entity matches the type indicated by the input + * argument. + */ + public boolean matches(String typeString) { + return toString().equals(typeString); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java index 78943e0..7c9acf2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java @@ -182,14 +182,14 @@ public class TestTimelineServiceRecords { ClusterEntity cluster = new ClusterEntity(); cluster.setId("test cluster id"); - FlowEntity flow1 = new FlowEntity(); + FlowRunEntity flow1 = new FlowRunEntity(); //flow1.setId("test flow id 1"); flow1.setUser(user.getId()); flow1.setName("test flow name 1"); flow1.setVersion("test flow version 1"); flow1.setRunId(1L); - FlowEntity flow2 = new FlowEntity(); + FlowRunEntity flow2 = new FlowRunEntity(); //flow2.setId("test flow run id 2"); flow2.setUser(user.getId()); flow2.setName("test flow name 2"); @@ -213,19 +213,19 @@ public class TestTimelineServiceRecords { ApplicationAttemptId.newInstance( ApplicationId.newInstance(0, 1), 1), 1).toString()); - cluster.addChild(TimelineEntityType.YARN_FLOW.toString(), flow1.getId()); + cluster.addChild(TimelineEntityType.YARN_FLOW_RUN.toString(), flow1.getId()); flow1 .setParent(TimelineEntityType.YARN_CLUSTER.toString(), cluster.getId()); - flow1.addChild(TimelineEntityType.YARN_FLOW.toString(), flow2.getId()); - flow2.setParent(TimelineEntityType.YARN_FLOW.toString(), flow1.getId()); + flow1.addChild(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId()); + flow2.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow1.getId()); flow2.addChild(TimelineEntityType.YARN_APPLICATION.toString(), app1.getId()); flow2.addChild(TimelineEntityType.YARN_APPLICATION.toString(), app2.getId()); - app1.setParent(TimelineEntityType.YARN_FLOW.toString(), flow2.getId()); + app1.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId()); app1.addChild(TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), appAttempt.getId()); appAttempt .setParent(TimelineEntityType.YARN_APPLICATION.toString(), app1.getId()); - app2.setParent(TimelineEntityType.YARN_FLOW.toString(), flow2.getId()); + app2.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId()); appAttempt.addChild(TimelineEntityType.YARN_CONTAINER.toString(), container.getId()); container.setParent(TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java index 69031a2..5672759 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java @@ -107,7 +107,7 @@ public class TestTimelineServiceClientIntegration { client.start(); ClusterEntity cluster = new ClusterEntity(); cluster.setId(YarnConfiguration.DEFAULT_RM_CLUSTER_ID); - FlowEntity flow = new FlowEntity(); + FlowRunEntity flow = new FlowRunEntity(); flow.setUser(UserGroupInformation.getCurrentUser().getShortUserName()); flow.setName("test_flow_name"); flow.setVersion("test_flow_version"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java index 42fa365..8f595e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java @@ -47,7 +47,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEnti import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.ClusterEntity; import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.FlowEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -205,8 +205,8 @@ public class TimelineCollectorWebService { case YARN_CLUSTER: entitiesToReturn.addEntity(new ClusterEntity(entity)); break; - case YARN_FLOW: - entitiesToReturn.addEntity(new FlowEntity(entity)); + case YARN_FLOW_RUN: + entitiesToReturn.addEntity(new FlowRunEntity(entity)); break; case YARN_APPLICATION: entitiesToReturn.addEntity(new ApplicationEntity(entity)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java new file mode 100644 index 0000000..dfbc31d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.Collections; +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils; + +/** + * Timeline entity reader for application entities that are stored in the + * application table. + */ +class ApplicationEntityReader extends GenericEntityReader { + private static final ApplicationTable APPLICATION_TABLE = + new ApplicationTable(); + + public ApplicationEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, + Map<String, Object> infoFilters, Map<String, String> configFilters, + Set<String> metricFilters, Set<String> eventFilters, + EnumSet<Field> fieldsToRetrieve) { + super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, + createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, + relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, + eventFilters, fieldsToRetrieve); + } + + public ApplicationEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + String entityId, EnumSet<Field> fieldsToRetrieve) { + super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, + fieldsToRetrieve); + } + + /** + * Uses the {@link ApplicationTable}. + */ + protected BaseTable<?> getTable() { + return APPLICATION_TABLE; + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn) + throws IOException { + byte[] rowKey = + ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId, + appId); + Get get = new Get(rowKey); + get.setMaxVersions(Integer.MAX_VALUE); + return table.getResult(hbaseConf, conn, get); + } + + @Override + protected Iterable<Result> getResults(Configuration hbaseConf, + Connection conn) throws IOException { + // If getEntities() is called for an application, there can be at most + // one entity. If the entity passes the filter, it is returned. Otherwise, + // an empty set is returned. + byte[] rowKey = ApplicationRowKey.getRowKey(clusterId, userId, flowId, + flowRunId, appId); + Get get = new Get(rowKey); + get.setMaxVersions(Integer.MAX_VALUE); + Result result = table.getResult(hbaseConf, conn, get); + TimelineEntity entity = parseEntity(result); + Set<Result> set; + if (entity != null) { + set = Collections.singleton(result); + } else { + set = Collections.emptySet(); + } + return set; + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + if (result == null || result.isEmpty()) { + return null; + } + TimelineEntity entity = new TimelineEntity(); + entity.setType(TimelineEntityType.YARN_APPLICATION.toString()); + String entityId = ApplicationColumn.ID.readResult(result).toString(); + entity.setId(entityId); + + // fetch created time + Number createdTime = + (Number)ApplicationColumn.CREATED_TIME.readResult(result); + entity.setCreatedTime(createdTime.longValue()); + if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin || + entity.getCreatedTime() > createdTimeEnd)) { + return null; + } + + // fetch modified time + Number modifiedTime = + (Number)ApplicationColumn.MODIFIED_TIME.readResult(result); + entity.setModifiedTime(modifiedTime.longValue()); + if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin || + entity.getModifiedTime() > modifiedTimeEnd)) { + return null; + } + + // fetch is related to entities + boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { + readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO, + true); + if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations( + entity.getIsRelatedToEntities(), isRelatedTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) { + entity.getIsRelatedToEntities().clear(); + } + } + + // fetch relates to entities + boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { + readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO, + false); + if (checkRelatesTo && !TimelineReaderUtils.matchRelations( + entity.getRelatesToEntities(), relatesTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.RELATES_TO)) { + entity.getRelatesToEntities().clear(); + } + } + + // fetch info + boolean checkInfo = infoFilters != null && infoFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.INFO) || checkInfo) { + readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false); + if (checkInfo && + !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.INFO)) { + entity.getInfo().clear(); + } + } + + // fetch configs + boolean checkConfigs = configFilters != null && configFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { + readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true); + if (checkConfigs && !TimelineReaderUtils.matchFilters( + entity.getConfigs(), configFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.CONFIGS)) { + entity.getConfigs().clear(); + } + } + + // fetch events + boolean checkEvents = eventFilters != null && eventFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { + readEvents(entity, result, true); + if (checkEvents && !TimelineReaderUtils.matchEventFilters( + entity.getEvents(), eventFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.EVENTS)) { + entity.getEvents().clear(); + } + } + + // fetch metrics + boolean checkMetrics = metricFilters != null && metricFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { + readMetrics(entity, result, ApplicationColumnPrefix.METRIC); + if (checkMetrics && !TimelineReaderUtils.matchMetricFilters( + entity.getMetrics(), metricFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.METRICS)) { + entity.getMetrics().clear(); + } + } + return entity; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java new file mode 100644 index 0000000..d5ece2e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; + +import com.google.common.base.Preconditions; + +/** + * Timeline entity reader for flow activity entities that are stored in the + * flow activity table. + */ +class FlowActivityEntityReader extends TimelineEntityReader { + private static final FlowActivityTable FLOW_ACTIVITY_TABLE = + new FlowActivityTable(); + + public FlowActivityEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, + Map<String, Object> infoFilters, Map<String, String> configFilters, + Set<String> metricFilters, Set<String> eventFilters, + EnumSet<Field> fieldsToRetrieve) { + super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, + createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, + relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, + eventFilters, fieldsToRetrieve); + } + + public FlowActivityEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + String entityId, EnumSet<Field> fieldsToRetrieve) { + super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, + fieldsToRetrieve); + } + + /** + * Uses the {@link FlowActivityTable}. + */ + @Override + protected BaseTable<?> getTable() { + return FLOW_ACTIVITY_TABLE; + } + + /** + * Since this is strictly sorted by the row key, it is sufficient to collect + * the first results as specified by the limit. + */ + @Override + public Set<TimelineEntity> readEntities(Configuration hbaseConf, + Connection conn) throws IOException { + validateParams(); + augmentParams(hbaseConf, conn); + + NavigableSet<TimelineEntity> entities = new TreeSet<>(); + Iterable<Result> results = getResults(hbaseConf, conn); + for (Result result : results) { + TimelineEntity entity = parseEntity(result); + if (entity == null) { + continue; + } + entities.add(entity); + if (entities.size() == limit) { + break; + } + } + return entities; + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) + throws IOException { + if (limit == null || limit < 0) { + limit = TimelineReader.DEFAULT_LIMIT; + } + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn) + throws IOException { + throw new UnsupportedOperationException( + "we don't support a single entity query"); + } + + @Override + protected Iterable<Result> getResults(Configuration hbaseConf, + Connection conn) throws IOException { + Scan scan = new Scan(); + scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId)); + // use the page filter to limit the result to the page size + // the scanner may still return more than the limit; therefore we need to + // read the right number as we iterate + scan.setFilter(new PageFilter(limit)); + return table.getResultScanner(hbaseConf, conn, scan); + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(result.getRow()); + + long time = rowKey.getDayTimestamp(); + String user = rowKey.getUserId(); + String flowName = rowKey.getFlowId(); + + FlowActivityEntity flowActivity = + new FlowActivityEntity(clusterId, time, user, flowName); + // set the id + flowActivity.setId(flowActivity.getId()); + // get the list of run ids along with the version that are associated with + // this flow on this day + Map<String, Object> runIdsMap = + FlowActivityColumnPrefix.RUN_ID.readResults(result); + for (Map.Entry<String, Object> e : runIdsMap.entrySet()) { + Long runId = Long.valueOf(e.getKey()); + String version = (String)e.getValue(); + FlowRunEntity flowRun = new FlowRunEntity(); + flowRun.setUser(user); + flowRun.setName(flowName); + flowRun.setRunId(runId); + flowRun.setVersion(version); + // set the id + flowRun.setId(flowRun.getId()); + flowActivity.addFlowRun(flowRun); + } + + return flowActivity; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java new file mode 100644 index 0000000..ced795d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; + +import com.google.common.base.Preconditions; + +/** + * Timeline entity reader for flow run entities that are stored in the flow run + * table. + */ +class FlowRunEntityReader extends TimelineEntityReader { + private static final FlowRunTable FLOW_RUN_TABLE = new FlowRunTable(); + + public FlowRunEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, + Map<String, Object> infoFilters, Map<String, String> configFilters, + Set<String> metricFilters, Set<String> eventFilters, + EnumSet<Field> fieldsToRetrieve) { + super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, + createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, + relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, + eventFilters, fieldsToRetrieve); + } + + public FlowRunEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + String entityId, EnumSet<Field> fieldsToRetrieve) { + super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, + fieldsToRetrieve); + } + + /** + * Uses the {@link FlowRunTable}. + */ + @Override + protected BaseTable<?> getTable() { + return FLOW_RUN_TABLE; + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); + Preconditions.checkNotNull(userId, "userId shouldn't be null"); + Preconditions.checkNotNull(flowId, "flowId shouldn't be null"); + Preconditions.checkNotNull(flowRunId, "flowRunId shouldn't be null"); + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) { + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn) + throws IOException { + byte[] rowKey = + FlowRunRowKey.getRowKey(clusterId, userId, flowId, flowRunId); + Get get = new Get(rowKey); + get.setMaxVersions(Integer.MAX_VALUE); + return table.getResult(hbaseConf, conn, get); + } + + @Override + protected Iterable<Result> getResults(Configuration hbaseConf, + Connection conn) throws IOException { + throw new UnsupportedOperationException( + "multiple entity query is not supported"); + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + FlowRunEntity flowRun = new FlowRunEntity(); + flowRun.setUser(userId); + flowRun.setName(flowId); + flowRun.setRunId(flowRunId); + + // read the start time + Long startTime = (Long)FlowRunColumn.MIN_START_TIME.readResult(result); + if (startTime != null) { + flowRun.setStartTime(startTime); + } + // read the end time if available + Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result); + if (endTime != null) { + flowRun.setMaxEndTime(endTime); + } + + // read the flow version + String version = (String)FlowRunColumn.FLOW_VERSION.readResult(result); + if (version != null) { + flowRun.setVersion(version); + } + + // read metrics + readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC); + + // set the id + flowRun.setId(flowRun.getId()); + return flowRun; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f998f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java new file mode 100644 index 0000000..466914b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java @@ -0,0 +1,389 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; + +import com.google.common.base.Preconditions; + +/** + * Timeline entity reader for generic entities that are stored in the entity + * table. + */ +class GenericEntityReader extends TimelineEntityReader { + private static final EntityTable ENTITY_TABLE = new EntityTable(); + private static final Log LOG = LogFactory.getLog(GenericEntityReader.class); + + private static final long DEFAULT_BEGIN_TIME = 0L; + private static final long DEFAULT_END_TIME = Long.MAX_VALUE; + + /** + * Used to look up the flow context. + */ + private final AppToFlowTable appToFlowTable = new AppToFlowTable(); + + public GenericEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, + Map<String, Object> infoFilters, Map<String, String> configFilters, + Set<String> metricFilters, Set<String> eventFilters, + EnumSet<Field> fieldsToRetrieve) { + super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, + createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, + relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, + eventFilters, fieldsToRetrieve); + } + + public GenericEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + String entityId, EnumSet<Field> fieldsToRetrieve) { + super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, + fieldsToRetrieve); + } + + /** + * Uses the {@link EntityTable}. + */ + protected BaseTable<?> getTable() { + return ENTITY_TABLE; + } + + private FlowContext lookupFlowContext(String clusterId, String appId, + Configuration hbaseConf, Connection conn) throws IOException { + byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); + Get get = new Get(rowKey); + Result result = appToFlowTable.getResult(hbaseConf, conn, get); + if (result != null && !result.isEmpty()) { + return new FlowContext( + AppToFlowColumn.FLOW_ID.readResult(result).toString(), + ((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue()); + } else { + throw new IOException( + "Unable to find the context flow ID and flow run ID for clusterId=" + + clusterId + ", appId=" + appId); + } + } + + private static class FlowContext { + private final String flowId; + private final Long flowRunId; + public FlowContext(String flowId, Long flowRunId) { + this.flowId = flowId; + this.flowRunId = flowRunId; + } + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(userId, "userId shouldn't be null"); + Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); + Preconditions.checkNotNull(appId, "appId shouldn't be null"); + Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); + if (singleEntityRead) { + Preconditions.checkNotNull(entityId, "entityId shouldn't be null"); + } + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) + throws IOException { + // In reality both should be null or neither should be null + if (flowId == null || flowRunId == null) { + FlowContext context = + lookupFlowContext(clusterId, appId, hbaseConf, conn); + flowId = context.flowId; + flowRunId = context.flowRunId; + } + if (fieldsToRetrieve == null) { + fieldsToRetrieve = EnumSet.noneOf(Field.class); + } + if (!singleEntityRead) { + if (limit == null || limit < 0) { + limit = TimelineReader.DEFAULT_LIMIT; + } + if (createdTimeBegin == null) { + createdTimeBegin = DEFAULT_BEGIN_TIME; + } + if (createdTimeEnd == null) { + createdTimeEnd = DEFAULT_END_TIME; + } + if (modifiedTimeBegin == null) { + modifiedTimeBegin = DEFAULT_BEGIN_TIME; + } + if (modifiedTimeEnd == null) { + modifiedTimeEnd = DEFAULT_END_TIME; + } + } + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn) + throws IOException { + byte[] rowKey = + EntityRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId, + entityType, entityId); + Get get = new Get(rowKey); + get.setMaxVersions(Integer.MAX_VALUE); + return table.getResult(hbaseConf, conn, get); + } + + @Override + protected Iterable<Result> getResults(Configuration hbaseConf, + Connection conn) throws IOException { + // Scan through part of the table to find the entities belong to one app + // and one type + Scan scan = new Scan(); + scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix( + clusterId, userId, flowId, flowRunId, appId, entityType)); + scan.setMaxVersions(Integer.MAX_VALUE); + return table.getResultScanner(hbaseConf, conn, scan); + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + if (result == null || result.isEmpty()) { + return null; + } + TimelineEntity entity = new TimelineEntity(); + String entityType = EntityColumn.TYPE.readResult(result).toString(); + entity.setType(entityType); + String entityId = EntityColumn.ID.readResult(result).toString(); + entity.setId(entityId); + + // fetch created time + Number createdTime = (Number)EntityColumn.CREATED_TIME.readResult(result); + entity.setCreatedTime(createdTime.longValue()); + if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin || + entity.getCreatedTime() > createdTimeEnd)) { + return null; + } + + // fetch modified time + Number modifiedTime = (Number)EntityColumn.MODIFIED_TIME.readResult(result); + entity.setModifiedTime(modifiedTime.longValue()); + if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin || + entity.getModifiedTime() > modifiedTimeEnd)) { + return null; + } + + // fetch is related to entities + boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { + readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true); + if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations( + entity.getIsRelatedToEntities(), isRelatedTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) { + entity.getIsRelatedToEntities().clear(); + } + } + + // fetch relates to entities + boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { + readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false); + if (checkRelatesTo && !TimelineReaderUtils.matchRelations( + entity.getRelatesToEntities(), relatesTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.RELATES_TO)) { + entity.getRelatesToEntities().clear(); + } + } + + // fetch info + boolean checkInfo = infoFilters != null && infoFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.INFO) || checkInfo) { + readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false); + if (checkInfo && + !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.INFO)) { + entity.getInfo().clear(); + } + } + + // fetch configs + boolean checkConfigs = configFilters != null && configFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { + readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true); + if (checkConfigs && !TimelineReaderUtils.matchFilters( + entity.getConfigs(), configFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.CONFIGS)) { + entity.getConfigs().clear(); + } + } + + // fetch events + boolean checkEvents = eventFilters != null && eventFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { + readEvents(entity, result, false); + if (checkEvents && !TimelineReaderUtils.matchEventFilters( + entity.getEvents(), eventFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.EVENTS)) { + entity.getEvents().clear(); + } + } + + // fetch metrics + boolean checkMetrics = metricFilters != null && metricFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { + readMetrics(entity, result, EntityColumnPrefix.METRIC); + if (checkMetrics && !TimelineReaderUtils.matchMetricFilters( + entity.getMetrics(), metricFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.METRICS)) { + entity.getMetrics().clear(); + } + } + return entity; + } + + /** + * Helper method for reading relationship. + */ + protected <T> void readRelationship( + TimelineEntity entity, Result result, ColumnPrefix<T> prefix, + boolean isRelatedTo) throws IOException { + // isRelatedTo and relatesTo are of type Map<String, Set<String>> + Map<String, Object> columns = prefix.readResults(result); + for (Map.Entry<String, Object> column : columns.entrySet()) { + for (String id : Separator.VALUES.splitEncoded( + column.getValue().toString())) { + if (isRelatedTo) { + entity.addIsRelatedToEntity(column.getKey(), id); + } else { + entity.addRelatesToEntity(column.getKey(), id); + } + } + } + } + + /** + * Helper method for reading key-value pairs for either info or config. + */ + protected <T> void readKeyValuePairs( + TimelineEntity entity, Result result, ColumnPrefix<T> prefix, + boolean isConfig) throws IOException { + // info and configuration are of type Map<String, Object or String> + Map<String, Object> columns = prefix.readResults(result); + if (isConfig) { + for (Map.Entry<String, Object> column : columns.entrySet()) { + entity.addConfig(column.getKey(), column.getValue().toString()); + } + } else { + entity.addInfo(columns); + } + } + + /** + * Read events from the entity table or the application table. The column name + * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted + * if there is no info associated with the event. + * + * See {@link EntityTable} and {@link ApplicationTable} for a more detailed + * schema description. + */ + protected void readEvents(TimelineEntity entity, Result result, + boolean isApplication) throws IOException { + Map<String, TimelineEvent> eventsMap = new HashMap<>(); + Map<?, Object> eventsResult = isApplication ? + ApplicationColumnPrefix.EVENT. + readResultsHavingCompoundColumnQualifiers(result) : + EntityColumnPrefix.EVENT. + readResultsHavingCompoundColumnQualifiers(result); + for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) { + byte[][] karr = (byte[][])eventResult.getKey(); + // the column name is of the form "eventId=timestamp=infoKey" + if (karr.length == 3) { + String id = Bytes.toString(karr[0]); + long ts = TimelineWriterUtils.invert(Bytes.toLong(karr[1])); + String key = Separator.VALUES.joinEncoded(id, Long.toString(ts)); + TimelineEvent event = eventsMap.get(key); + if (event == null) { + event = new TimelineEvent(); + event.setId(id); + event.setTimestamp(ts); + eventsMap.put(key, event); + } + // handle empty info + String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]); + if (infoKey != null) { + event.addInfo(infoKey, eventResult.getValue()); + } + } else { + LOG.warn("incorrectly formatted column name: it will be discarded"); + continue; + } + } + Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values()); + entity.addEvents(eventsSet); + } +}