http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/scripts/ams_query.py ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/scripts/ams_query.py b/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/scripts/ams_query.py new file mode 100644 index 0000000..d51357a --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/scripts/ams_query.py @@ -0,0 +1,209 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import urllib2 +import signal +import sys +import optparse +import time + +# http://162.216.148.45:8188/ws/v1/timeline/metrics? +# metricNames=rpc.rpc.RpcAuthenticationSuccesses +# &appId=nodemanager&hostname=local.0&startTime=1414152029&endTime=1414155629 + +AMS_URL = "http://{0}:8188/ws/v1/timeline/metrics?metricNames={1}&appid={" \ + "2}&hostname={3}" + +# in fact it can be list automatically generated from ambari +# UI queries +host_metrics = { + 'cpu': ['cpu_user', 'cpu_wio', 'cpu_nice', 'cpu_aidle', 'cpu_system', 'cpu_idle'], + 'disk': ['disk_total', 'disk_free'], + 'load': ['load_one', 'load_fifteen', 'load_five'], + 'mem': ['swap_free', 'mem_shared', 'mem_free', 'mem_cached', 'mem_buffers'], + 'network': ['bytes_in', 'bytes_out', 'pkts_in', 'pkts_out'], + 'process': ['proc_total', 'proc_run'] +} + +# HDFS_SERVICE +namenode_metrics = { + 'dfs.Capacity': ['dfs.FSNamesystem.CapacityRemainingGB', + 'dfs.FSNamesystem.CapacityUsedGB', + 'dfs.FSNamesystem.CapacityTotalGB'], + 'dfs.Replication': ['dfs.FSNamesystem.PendingReplicationBlocks', + 'dfs.FSNamesystem.UnderReplicatedBlocks'], + 'dfs.File': ['dfs.namenode.FileInfoOps', 'dfs.namenode.CreateFileOps'], + 'jvm.gc': ['jvm.JvmMetrics.GcTimeMillis'], + 'jvm.mem': ['jvm.JvmMetrics.MemNonHeapUsedM', + 'jvm.JvmMetrics.MemNonHeapCommittedM', + 'jvm.JvmMetrics.MemHeapUsedM', + 'jvm.JvmMetrics.MemHeapCommittedM'], + 'jvm.thread': ['jvm.JvmMetrics.ThreadsRunnable', + 'jvm.JvmMetrics.ThreadsBlocked', + 'jvm.JvmMetrics.ThreadsWaiting', + 'jvm.JvmMetrics.ThreadsTimedWaiting'], + 'rpc': ['rpc.rpc.RpcQueueTimeAvgTime'] +} + +all_metrics = { + 'HOST': host_metrics, + 'namenode': namenode_metrics +} + +all_metrics_times = {} + + +# hostnames = ['EPPLKRAW0101.0'] # 'local.0' +# metrics_test_host = '162.216.150.247' # metricstest-100 +# metrics_test_host = '162.216.148.45' # br-3 +# start_time = int(time.time()) # 1414425208 + + +def main(argv=None): + # Allow Ctrl-C + signal.signal(signal.SIGINT, signal_handler) + + parser = optparse.OptionParser() + + parser.add_option("-H", "--host", dest="host", + help="AMS host") + parser.add_option("-t", "--starttime", dest="start_time_secs", + default=int(time.time()), + help="start time in seconds, default value is current time") + parser.add_option("-n", "--nodes", dest="node_names", + help="nodes from cluster, used as a param to query for") + (options, args) = parser.parse_args() + + if options.host is None: + print "AMS host name is required (--host or -h)" + exit(-1) + + if options.node_names is None: + print "cluster nodes are required (--nodes or -n)" + exit(3) + + global start_time_secs, metrics_test_host, hostnames + + metrics_test_host = options.host + start_time_secs = int(options.start_time_secs) + hostnames = [options.node_names] + + while True: + run() + time.sleep(15) + start_time_secs += 15 + + +def signal_handler(signal, frame): + print('Exiting, Ctrl+C press detected!') + print_all_metrics(all_metrics_times) + sys.exit(0) + + +def run(): + hostname = ','.join(hostnames) + qs = QuerySender(metrics_test_host, True) + for metric_name in all_metrics: + print + print 'Querying for ' + metric_name + ' metrics' + current_time_secs = start_time_secs + qs.query_all_app_metrics(hostname, metric_name, + all_metrics[metric_name], + current_time_secs) + + +def add_query_metrics_for_app_id(app_id, metric_timing): + if not app_id in all_metrics_times: + all_metrics_times[app_id] = [] + all_metrics_times[app_id].append(metric_timing) + + +def print_all_metrics(metrics): + print 'Metrics Summary' + for app_id in sorted(metrics): + first = True + for single_query_metrics in metrics[app_id]: + print_app_metrics(app_id, single_query_metrics, first) + first = False + + +def print_app_metrics(app_id, metric_timing, header=False): + #header + if header: + print app_id + ': ' + ','.join(sorted(metric_timing.keys())) + #vals + print app_id + ':', + for key in sorted(metric_timing): + print '%.3f,' % metric_timing[key], + print + + +class QuerySender: + def __init__(self, metrics_address, print_responses=False): + self.metrics_address = metrics_address + self.print_responses = print_responses + + def query_all_app_metrics(self, hostname, app_id, metrics, current_time_secs): + metric_timing = {} + for key in metrics: + print 'Getting metrics for', key + query_time = time.time() + + metric_names = ','.join(metrics[key]) + self.query(hostname, app_id, metric_names, current_time_secs) + query_time_elapsed = time.time() - query_time + + print 'Query for "%s" took %s' % (key, query_time_elapsed) + metric_timing[key] = query_time_elapsed + + add_query_metrics_for_app_id(app_id, metric_timing) + if self.print_responses: + print_app_metrics(app_id, metric_timing) + + def query(self, hostname, app_id, metric_names, current_time_secs): + url = self.create_url(hostname, metric_names, app_id, current_time_secs) + print url + response = self.send(url) + if self.print_responses: + print response + pass + + def send(self, url): + request = urllib2.Request(url) + try: + response = urllib2.urlopen(request, timeout=int(30)) + response = response.read() + return response + + except urllib2.URLError as e: + print e.reason + + def create_url(self, hostname, metric_names, app_id, current_time_secs): + server = AMS_URL.format(self.metrics_address, + metric_names, + app_id, + hostname) + t = current_time_secs + server += '&startTime=%s&endTime=%s' % (t, t + 3600) + return server + + +if __name__ == '__main__': + main()
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/scripts/start.sh ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/scripts/start.sh b/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/scripts/start.sh new file mode 100644 index 0000000..1d6fff5 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/scripts/start.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# set -x +cd "$(dirname "$0")" + +METRICS_HOST=$1 +HOST_COUNT=$2 + +echo $$ > sim_pid +cat sim_pid +#HOMEDIR +exec java -jar ../lib/ambari-metrics/ambari-metrics-timelineservice-simulator*.jar -h `hostname -f` -n ${HOST_COUNT} -m ${METRICS_HOST} -c 15000 -s 60000 http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/scripts/start_slaves.sh ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/scripts/start_slaves.sh b/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/scripts/start_slaves.sh new file mode 100644 index 0000000..e1e51c8 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/scripts/start_slaves.sh @@ -0,0 +1,27 @@ +#!/usr/bin/env bash +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +## set -x + +SLAVES=$1 +METRICS_HOST=$2 +HOST_COUNT=$3 +LOADSIM_HOMEDIR="LoadSimulator-1.0-SNAPSHOT" + +pdsh -w ${SLAVES} "$LOADSIM_HOMEDIR/start.sh ${METRICS_HOST} ${HOST_COUNT}'" http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/scripts/status_slaves.sh ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/scripts/status_slaves.sh b/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/scripts/status_slaves.sh new file mode 100644 index 0000000..79787fd --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/scripts/status_slaves.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +## set -x + + http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/scripts/stop.sh ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/scripts/stop.sh b/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/scripts/stop.sh new file mode 100644 index 0000000..2220861 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/scripts/stop.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# set -x +cd "$(dirname "$0")" +read PID <./sim_pid + +FOUND_PID=`ps aux | grep simulator | grep $PID` +if [ -z "${FOUND_PID}" ] +then + echo "process simulator not fund under pid $PID" +else + echo "process simulator running as $PID, killing" + kill ${PID} +fi http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/scripts/stop_slaves.sh ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/scripts/stop_slaves.sh b/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/scripts/stop_slaves.sh new file mode 100644 index 0000000..7cb567a --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/resources/scripts/stop_slaves.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# set -x + +SLAVES=$1 +LOADSIM_HOMEDIR="LoadSimulator-1.0-SNAPSHOT" + +pdsh -w ${SLAVES} "$LOADSIM_HOMEDIR/stop.sh" http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/test/conf/ams-site.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/conf/ams-site.xml b/ambari-metrics/ambari-metrics-timelineservice/src/test/conf/ams-site.xml new file mode 100644 index 0000000..b08609d --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/conf/ams-site.xml @@ -0,0 +1,29 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +--> +<!-- Empty site, all defaults should be initialized in the code --> +<configuration> + <property> + <name>test</name> + <value>testReady</value> + </property> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/test/conf/hadoop-policy.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/conf/hadoop-policy.xml b/ambari-metrics/ambari-metrics-timelineservice/src/test/conf/hadoop-policy.xml new file mode 100644 index 0000000..41bde16 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/conf/hadoop-policy.xml @@ -0,0 +1,134 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- Put site-specific property overrides in this file. --> + +<configuration supports_final="true"> + <property> + <name>security.client.protocol.acl</name> + <value>*</value> + <description>ACL for ClientProtocol, which is used by user code + via the DistributedFileSystem. + The ACL is a comma-separated list of user and group names. The user and + group list is separated by a blank. For e.g. "alice,bob users,wheel". + A special value of "*" means all users are allowed.</description> + </property> + + <property> + <name>security.client.datanode.protocol.acl</name> + <value>*</value> + <description>ACL for ClientDatanodeProtocol, the client-to-datanode protocol + for block recovery. + The ACL is a comma-separated list of user and group names. The user and + group list is separated by a blank. For e.g. "alice,bob users,wheel". + A special value of "*" means all users are allowed.</description> + </property> + + <property> + <name>security.datanode.protocol.acl</name> + <value>*</value> + <description>ACL for DatanodeProtocol, which is used by datanodes to + communicate with the namenode. + The ACL is a comma-separated list of user and group names. The user and + group list is separated by a blank. For e.g. "alice,bob users,wheel". + A special value of "*" means all users are allowed.</description> + </property> + + <property> + <name>security.inter.datanode.protocol.acl</name> + <value>*</value> + <description>ACL for InterDatanodeProtocol, the inter-datanode protocol + for updating generation timestamp. + The ACL is a comma-separated list of user and group names. The user and + group list is separated by a blank. For e.g. "alice,bob users,wheel". + A special value of "*" means all users are allowed.</description> + </property> + + <property> + <name>security.namenode.protocol.acl</name> + <value>*</value> + <description>ACL for NamenodeProtocol, the protocol used by the secondary + namenode to communicate with the namenode. + The ACL is a comma-separated list of user and group names. The user and + group list is separated by a blank. For e.g. "alice,bob users,wheel". + A special value of "*" means all users are allowed.</description> + </property> + + <property> + <name>security.inter.tracker.protocol.acl</name> + <value>*</value> + <description>ACL for InterTrackerProtocol, used by the tasktrackers to + communicate with the jobtracker. + The ACL is a comma-separated list of user and group names. The user and + group list is separated by a blank. For e.g. "alice,bob users,wheel". + A special value of "*" means all users are allowed.</description> + </property> + + <property> + <name>security.job.client.protocol.acl</name> + <value>*</value> + <description>ACL for JobSubmissionProtocol, used by job clients to + communciate with the jobtracker for job submission, querying job status etc. + The ACL is a comma-separated list of user and group names. The user and + group list is separated by a blank. For e.g. "alice,bob users,wheel". + A special value of "*" means all users are allowed.</description> + </property> + + <property> + <name>security.job.task.protocol.acl</name> + <value>*</value> + <description>ACL for TaskUmbilicalProtocol, used by the map and reduce + tasks to communicate with the parent tasktracker. + The ACL is a comma-separated list of user and group names. The user and + group list is separated by a blank. For e.g. "alice,bob users,wheel". + A special value of "*" means all users are allowed.</description> + </property> + + <property> + <name>security.admin.operations.protocol.acl</name> + <value>hadoop</value> + <description>ACL for AdminOperationsProtocol. Used for admin commands. + The ACL is a comma-separated list of user and group names. The user and + group list is separated by a blank. For e.g. "alice,bob users,wheel". + A special value of "*" means all users are allowed.</description> + </property> + + <property> + <name>security.refresh.usertogroups.mappings.protocol.acl</name> + <value>hadoop</value> + <description>ACL for RefreshUserMappingsProtocol. Used to refresh + users mappings. The ACL is a comma-separated list of user and + group names. The user and group list is separated by a blank. For + e.g. "alice,bob users,wheel". A special value of "*" means all + users are allowed.</description> + </property> + +<property> + <name>security.refresh.policy.protocol.acl</name> + <value>hadoop</value> + <description>ACL for RefreshAuthorizationPolicyProtocol, used by the + dfsadmin and mradmin commands to refresh the security policy in-effect. + The ACL is a comma-separated list of user and group names. The user and + group list is separated by a blank. For e.g. "alice,bob users,wheel". + A special value of "*" means all users are allowed.</description> + </property> + + +</configuration> http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/test/conf/hbase-site.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/conf/hbase-site.xml b/ambari-metrics/ambari-metrics-timelineservice/src/test/conf/hbase-site.xml new file mode 100644 index 0000000..c453900 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/conf/hbase-site.xml @@ -0,0 +1,230 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +--> +<configuration> + <property> + <name>hbase.defaults.for.version.skip</name> + <value>true</value> + </property> + <property> + <name>hbase.rootdir</name> + <value>file:///var/lib/hbase</value> + <description> + AMS service uses HBase as default storage backend. Set the rootdir for + HBase to either local filesystem path if using AMS in embedded mode or + to a HDFS dir, example: hdfs://namenode.example.org:9000/hbase. By + default HBase writes into /tmp. Change this configuration else all data + will be lost on machine restart. + </description> + </property> + <property> + <name>hbase.tmp.dir</name> + <value>/tmp</value> + <description> + Temporary directory on the local filesystem. + Change this setting to point to a location more permanent + than '/tmp' (The '/tmp' directory is often cleared on + machine restart). + </description> + </property> + <property> + <name>hbase.local.dir</name> + <value>${hbase.tmp.dir}/local</value> + <description>Directory on the local filesystem to be used as a local storage + </description> + </property> + <property> + <name>hbase.cluster.distributed</name> + <value>true</value> + <description> + The mode the cluster will be in. Possible values are false for + standalone mode and true for distributed mode. If false, startup will run + all HBase and ZooKeeper daemons together in the one JVM. + </description> + </property> + <property> + <name>hbase.master.wait.on.regionservers.mintostart</name> + <value>1</value> + <description> + Ensure that HBase Master waits for # many region server to start. + </description> + </property> + <property> + <name>hbase.zookeeper.quorum</name> + <value>localhost</value> + <description>Comma separated list of servers in the ZooKeeper Quorum. + For example, "host1.mydomain.com,host2.mydomain.com,host3.mydomain.com". + By default this is set to localhost for local and pseudo-distributed modes + of operation. For a fully-distributed setup, this should be set to a full + list of ZooKeeper quorum servers. If HBASE_MANAGES_ZK is set in hbase-env.sh + this is the list of servers which we will start/stop ZooKeeper on. + </description> + </property> + <property> + <name>hbase.master.info.bindAddress</name> + <value>0.0.0.0</value> + <description>The bind address for the HBase Master web UI</description> + </property> + <property> + <name>hbase.master.info.port</name> + <value>90010</value> + <description>The port for the HBase Master web UI.</description> + </property> + <property> + <name>hbase.regionserver.info.port</name> + <value>90030</value> + <description>The port for the HBase RegionServer web UI.</description> + </property> + <property> + <name>hbase.hregion.majorcompaction</name> + <value>0</value> + <description> + The time (in milliseconds) between 'major' compactions of all + HStoreFiles in a region. + 0 to disable automated major compactions. + </description> + </property> + <property> + <name>phoenix.query.spoolThresholdBytes</name> + <value>12582912</value> + <description> + Threshold size in bytes after which results from parallelly executed + query results are spooled to disk. Default is 20 mb. + </description> + </property> + <property> + <name>hbase.zookeeper.property.dataDir</name> + <value>${hbase.tmp.dir}/zookeeper</value> + <description> + Property from ZooKeeper's config zoo.cfg. + The directory where the snapshot is stored. + </description> + </property> + <property> + <name>hbase.client.scanner.caching</name> + <value>10000</value> + <description> + Number of rows that will be fetched when calling next on a scanner + if it is not served from (local, client) memory. + </description> + </property> + <property> + <name>hfile.block.cache.size</name> + <value>0.3</value> + <description> + Percentage of maximum heap (-Xmx setting) to allocate to block cache + used by a StoreFile. Default of 0.4 means allocate 40%. + </description> + </property> + <property> + <name>hbase.regionserver.global.memstore.upperLimit</name> + <value>0.5</value> + <description> + Maximum size of all memstores in a region server before new + updates are blocked and flushes are forced. Defaults to 40% of heap + </description> + </property> + <property> + <name>hbase.regionserver.global.memstore.lowerLimit</name> + <value>0.4</value> + <description> + When memstores are being forced to flush to make room in + memory, keep flushing until we hit this mark. Defaults to 35% of heap. + This value equal to hbase.regionserver.global.memstore.upperLimit causes + the minimum possible flushing to occur when updates are blocked due to + memstore limiting. + </description> + </property> + <property> + <name>phoenix.groupby.maxCacheSize</name> + <value>307200000</value> + <description> + Size in bytes of pages cached during GROUP BY spilling. Default is 100Mb. + </description> + </property> + <property> + <name>hbase.hregion.memstore.block.multiplier</name> + <value>4</value> + <description> + Block updates if memstore has hbase.hregion.memstore.block.multiplier + times hbase.hregion.memstore.flush.size bytes. Useful preventing runaway + memstore during spikes in update traffic. + </description> + </property> + <property> + <name>hbase.hstore.flusher.count</name> + <value>2</value> + <description> + The number of flush threads. With fewer threads, the MemStore flushes + will be queued. With more threads, the flushes will be executed in parallel, + increasing the load on HDFS, and potentially causing more compactions. + </description> + </property> + <property> + <name>phoenix.query.timeoutMs</name> + <value>1200000</value> + <description> + Number of milliseconds after which a query will timeout on the client. + Default is 10 min. + </description> + </property> + <property> + <name>hbase.client.scanner.timeout.period</name> + <value>900000</value> + <description> + Client scanner lease period in milliseconds. + </description> + </property> + <property> + <name>hbase.regionserver.thread.compaction.large</name> + <value>2</value> + <description> + Configuration key for the large compaction threads. + </description> + </property> + <property> + <name>hbase.regionserver.thread.compaction.small</name> + <value>3</value> + <description> + Configuration key for the small compaction threads. + </description> + </property> + <property> + <name>hbase.hstore.blockingStoreFiles</name> + <value>200</value> + <description> + If more than this number of StoreFiles exist in any one Store + (one StoreFile is written per flush of MemStore), updates are blocked for + this region until a compaction is completed, or until + hbase.hstore.blockingWaitTime has been exceeded. + </description> + </property> + <property> + <name>hbase.hregion.memstore.flush.size</name> + <value>134217728</value> + <description> + Memstore will be flushed to disk if size of the memstore exceeds this + number of bytes. Value is checked by a thread that runs every + hbase.server.thread.wakefrequency. + </description> + </property> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java new file mode 100644 index 0000000..f7e53f5 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java @@ -0,0 +1,384 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.fail; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LOG; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; + +public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { + private Connection conn; + private PhoenixHBaseAccessor hdb; + + @Before + public void setUp() throws Exception { + hdb = createTestableHBaseAccessor(); + // inits connection, starts mini cluster + conn = getConnection(getUrl()); + + hdb.initMetricSchema(); + } + + @After + public void tearDown() throws Exception { + Connection conn = getConnection(getUrl()); + Statement stmt = conn.createStatement(); + + stmt.execute("delete from METRIC_AGGREGATE"); + stmt.execute("delete from METRIC_AGGREGATE_HOURLY"); + stmt.execute("delete from METRIC_RECORD"); + stmt.execute("delete from METRIC_RECORD_HOURLY"); + stmt.execute("delete from METRIC_RECORD_MINUTE"); + conn.commit(); + + stmt.close(); + conn.close(); + } + + @Test + public void testShouldAggregateClusterProperly() throws Exception { + // GIVEN + TimelineMetricClusterAggregator agg = + new TimelineMetricClusterAggregator(hdb, new Configuration()); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 1)); + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 2)); + ctime += minute; + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 2)); + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 1)); + + // WHEN + long endTime = ctime + minute; + boolean success = agg.doWork(startTime, endTime); + + //THEN + Condition condition = new Condition(null, null, null, null, startTime, + endTime, null, true); + condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA))); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt + (conn, condition); + ResultSet rs = pstmt.executeQuery(); + + int recordCount = 0; + while (rs.next()) { + TimelineClusterMetric currentMetric = + PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet(rs); + MetricClusterAggregate currentHostAggregate = + PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs); + + if ("disk_free".equals(currentMetric.getMetricName())) { + assertEquals(2, currentHostAggregate.getNumberOfHosts()); + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(1.0, currentHostAggregate.getMin()); + assertEquals(3.0, currentHostAggregate.getSum()); + recordCount++; + } else { + fail("Unexpected entry"); + } + } + } + + + @Test + public void testShouldAggregateDifferentMetricsOnClusterProperly() + throws Exception { + // GIVEN + TimelineMetricClusterAggregator agg = + new TimelineMetricClusterAggregator(hdb, new Configuration()); + + // here we put some metrics tha will be aggregated + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 1)); + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 2)); + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + "disk_used", 1)); + + ctime += minute; + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 2)); + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 1)); + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + "disk_used", 1)); + + // WHEN + long endTime = ctime + minute; + boolean success = agg.doWork(startTime, endTime); + + //THEN + Condition condition = new Condition(null, null, null, null, startTime, + endTime, null, true); + condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA))); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt + (conn, condition); + ResultSet rs = pstmt.executeQuery(); + + int recordCount = 0; + while (rs.next()) { + TimelineClusterMetric currentMetric = + PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet(rs); + MetricClusterAggregate currentHostAggregate = + PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs); + + if ("disk_free".equals(currentMetric.getMetricName())) { + assertEquals(2, currentHostAggregate.getNumberOfHosts()); + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(1.0, currentHostAggregate.getMin()); + assertEquals(3.0, currentHostAggregate.getSum()); + recordCount++; + } else if ("disk_used".equals(currentMetric.getMetricName())) { + assertEquals(1, currentHostAggregate.getNumberOfHosts()); + assertEquals(1.0, currentHostAggregate.getMax()); + assertEquals(1.0, currentHostAggregate.getMin()); + assertEquals(1.0, currentHostAggregate.getSum()); + recordCount++; + } else { + fail("Unexpected entry"); + } + } + } + + + @Test + public void testShouldAggregateClusterOnHourProperly() throws Exception { + // GIVEN + TimelineMetricClusterAggregatorHourly agg = + new TimelineMetricClusterAggregatorHourly(hdb, new Configuration()); + + // this time can be virtualized! or made independent from real clock + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + + Map<TimelineClusterMetric, MetricClusterAggregate> records = + new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); + + records.put(createEmptyTimelineMetric(ctime), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineMetric(ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineMetric(ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineMetric(ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + + hdb.saveClusterAggregateRecords(records); + + // WHEN + agg.doWork(startTime, ctime + minute); + + // THEN + ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY"); + int count = 0; + while (rs.next()) { + assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME")); + assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); + assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM")); + assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); + assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); + assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); + count++; + } + + assertEquals("One hourly aggregated row expected ", 1, count); + } + + @Test + public void testShouldAggregateDifferentMetricsOnHourProperly() throws + Exception { + // GIVEN + TimelineMetricClusterAggregatorHourly agg = + new TimelineMetricClusterAggregatorHourly(hdb, new Configuration()); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + + Map<TimelineClusterMetric, MetricClusterAggregate> records = + new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); + + records.put(createEmptyTimelineMetric("disk_used", ctime), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineMetric("disk_free", ctime), + new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); + + records.put(createEmptyTimelineMetric("disk_used", ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineMetric("disk_free", ctime), + new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); + + records.put(createEmptyTimelineMetric("disk_used", ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineMetric("disk_free", ctime), + new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); + + records.put(createEmptyTimelineMetric("disk_used", ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineMetric("disk_free", ctime), + new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); + + hdb.saveClusterAggregateRecords(records); + + // WHEN + agg.doWork(startTime, ctime + minute); + + // THEN + ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY"); + int count = 0; + while (rs.next()) { + if ("disk_used".equals(rs.getString("METRIC_NAME"))) { + assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); + assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM")); + assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); + assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); + assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); + } else if ("disk_free".equals(rs.getString("METRIC_NAME"))) { + assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); + assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM")); + assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); + assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX")); + assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN")); + } + + count++; + } + + assertEquals("Two hourly aggregated row expected ", 2, count); + } + + private ResultSet executeQuery(String query) throws SQLException { + Connection conn = getConnection(getUrl()); + Statement stmt = conn.createStatement(); + return stmt.executeQuery(query); + } + + private TimelineClusterMetric createEmptyTimelineMetric(String name, + long startTime) { + TimelineClusterMetric metric = new TimelineClusterMetric(name, + "test_app", null, startTime, null); + + return metric; + } + + private TimelineClusterMetric createEmptyTimelineMetric(long startTime) { + return createEmptyTimelineMetric("disk_used", startTime); + } + + private MetricHostAggregate + createMetricHostAggregate(double max, double min, int numberOfSamples, + double sum) { + MetricHostAggregate expectedAggregate = + new MetricHostAggregate(); + expectedAggregate.setMax(max); + expectedAggregate.setMin(min); + expectedAggregate.setNumberOfSamples(numberOfSamples); + expectedAggregate.setSum(sum); + + return expectedAggregate; + } + + private PhoenixHBaseAccessor createTestableHBaseAccessor() { + Configuration metricsConf = new Configuration(); + metricsConf.set( + TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME, "NONE"); + + return + new PhoenixHBaseAccessor( + new Configuration(), + metricsConf, + new ConnectionProvider() { + @Override + public Connection getConnection() { + Connection connection = null; + try { + connection = DriverManager.getConnection(getUrl()); + } catch (SQLException e) { + LOG.warn("Unable to connect to HBase store using Phoenix.", e); + } + return connection; + } + }); + } + + private TimelineMetrics prepareSingleTimelineMetric(long startTime, + String host, + String metricName, + double val) { + TimelineMetrics m = new TimelineMetrics(); + m.setMetrics(Arrays.asList( + createTimelineMetric(startTime, metricName, host, val))); + + return m; + } + + private TimelineMetric createTimelineMetric(long startTime, + String metricName, + String host, + double val) { + TimelineMetric m = new TimelineMetric(); + m.setAppId("host"); + m.setHostName(host); + m.setMetricName(metricName); + m.setStartTime(startTime); + Map<Long, Double> vals = new HashMap<Long, Double>(); + vals.put(startTime + 15000l, val); + vals.put(startTime + 30000l, val); + vals.put(startTime + 45000l, val); + vals.put(startTime + 60000l, val); + + m.setMetricValues(vals); + + return m; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java new file mode 100644 index 0000000..d166a22 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; +import static junit.framework.Assert.fail; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LOG; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; +import static org.assertj.core.api.Assertions.assertThat; + +public class ITMetricAggregator extends AbstractMiniHBaseClusterTest { + private Connection conn; + private PhoenixHBaseAccessor hdb; + + @Before + public void setUp() throws Exception { + hdb = createTestableHBaseAccessor(); + // inits connection, starts mini cluster + conn = getConnection(getUrl()); + + hdb.initMetricSchema(); + } + + @After + public void tearDown() throws Exception { + Connection conn = getConnection(getUrl()); + Statement stmt = conn.createStatement(); + + stmt.execute("delete from METRIC_AGGREGATE"); + stmt.execute("delete from METRIC_AGGREGATE_HOURLY"); + stmt.execute("delete from METRIC_RECORD"); + stmt.execute("delete from METRIC_RECORD_HOURLY"); + stmt.execute("delete from METRIC_RECORD_MINUTE"); + conn.commit(); + + stmt.close(); + conn.close(); + } + + @Test + public void testShouldInsertMetrics() throws Exception { + // GIVEN + + // WHEN + long startTime = System.currentTimeMillis(); + TimelineMetrics metricsSent = prepareTimelineMetrics(startTime, "local"); + hdb.insertMetricRecords(metricsSent); + + Condition queryCondition = new Condition(null, "local", null, null, + startTime, startTime + (15 * 60 * 1000), null, false); + TimelineMetrics recordRead = hdb.getMetricRecords(queryCondition); + + // THEN + assertThat(recordRead.getMetrics()).hasSize(2) + .extracting("metricName") + .containsOnly("mem_free", "disk_free"); + + assertThat(metricsSent.getMetrics()) + .usingElementComparator(TIME_IGNORING_COMPARATOR) + .containsExactlyElementsOf(recordRead.getMetrics()); + } + + @Test + public void testShouldAggregateMinuteProperly() throws Exception { + // GIVEN +// TimelineMetricAggregatorMinute aggregatorMinute = +// new TimelineMetricAggregatorMinute(hdb, new Configuration()); + TimelineMetricAggregator aggregatorMinute = TimelineMetricAggregatorFactory + .createTimelineMetricAggregatorMinute(hdb, new Configuration()); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + hdb.insertMetricRecords(prepareTimelineMetrics(startTime, "local")); + hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local")); + hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local")); + hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local")); + hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local")); + + // WHEN + long endTime = startTime + 1000 * 60 * 4; + boolean success = aggregatorMinute.doWork(startTime, endTime); + + //THEN + Condition condition = new Condition(null, null, null, null, startTime, + endTime, null, true); + condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), + METRICS_AGGREGATE_MINUTE_TABLE_NAME)); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt + (conn, condition); + ResultSet rs = pstmt.executeQuery(); + MetricHostAggregate expectedAggregate = + createMetricHostAggregate(2.0, 0.0, 20, 15.0); + + int count = 0; + while (rs.next()) { + TimelineMetric currentMetric = + PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs); + MetricHostAggregate currentHostAggregate = + PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs); + + if ("disk_free".equals(currentMetric.getMetricName())) { + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(0.0, currentHostAggregate.getMin()); + assertEquals(20, currentHostAggregate.getNumberOfSamples()); + assertEquals(15.0, currentHostAggregate.getSum()); + assertEquals(15.0 / 20, currentHostAggregate.getAvg()); + count++; + } else if ("mem_free".equals(currentMetric.getMetricName())) { + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(0.0, currentHostAggregate.getMin()); + assertEquals(20, currentHostAggregate.getNumberOfSamples()); + assertEquals(15.0, currentHostAggregate.getSum()); + assertEquals(15.0 / 20, currentHostAggregate.getAvg()); + count++; + } else { + fail("Unexpected entry"); + } + } + assertEquals("Two aggregated entries expected", 2, count); + } + + @Test + public void testShouldAggregateHourProperly() throws Exception { + // GIVEN +// TimelineMetricAggregatorHourly aggregator = +// new TimelineMetricAggregatorHourly(hdb, new Configuration()); + + TimelineMetricAggregator aggregator = TimelineMetricAggregatorFactory + .createTimelineMetricAggregatorHourly(hdb, new Configuration()); + long startTime = System.currentTimeMillis(); + + MetricHostAggregate expectedAggregate = + createMetricHostAggregate(2.0, 0.0, 20, 15.0); + Map<TimelineMetric, MetricHostAggregate> + aggMap = new HashMap<TimelineMetric, + MetricHostAggregate>(); + + int min_5 = 5 * 60 * 1000; + long ctime = startTime - min_5; + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + + hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_MINUTE_TABLE_NAME); + + //WHEN + long endTime = ctime + min_5; + boolean success = aggregator.doWork(startTime, endTime); + assertTrue(success); + + //THEN + Condition condition = new Condition(null, null, null, null, startTime, + endTime, null, true); + condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), + METRICS_AGGREGATE_HOURLY_TABLE_NAME)); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt + (conn, condition); + ResultSet rs = pstmt.executeQuery(); + + while (rs.next()) { + TimelineMetric currentMetric = + PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs); + MetricHostAggregate currentHostAggregate = + PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs); + + if ("disk_used".equals(currentMetric.getMetricName())) { + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(0.0, currentHostAggregate.getMin()); + assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples()); + assertEquals(12 * 15.0, currentHostAggregate.getSum()); + assertEquals(15.0 / 20, currentHostAggregate.getAvg()); + } + } + } + + private TimelineMetric createEmptyTimelineMetric(long startTime) { + TimelineMetric metric = new TimelineMetric(); + metric.setMetricName("disk_used"); + metric.setAppId("test_app"); + metric.setHostName("test_host"); + metric.setTimestamp(startTime); + + return metric; + } + + private MetricHostAggregate + createMetricHostAggregate(double max, double min, int numberOfSamples, + double sum) { + MetricHostAggregate expectedAggregate = + new MetricHostAggregate(); + expectedAggregate.setMax(max); + expectedAggregate.setMin(min); + expectedAggregate.setNumberOfSamples(numberOfSamples); + expectedAggregate.setSum(sum); + + return expectedAggregate; + } + + private PhoenixHBaseAccessor createTestableHBaseAccessor() { + Configuration metricsConf = new Configuration(); + metricsConf.set( + TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME, "NONE"); + + return + new PhoenixHBaseAccessor( + new Configuration(), + metricsConf, + new ConnectionProvider() { + @Override + public Connection getConnection() { + Connection connection = null; + try { + connection = DriverManager.getConnection(getUrl()); + } catch (SQLException e) { + LOG.warn("Unable to connect to HBase store using Phoenix.", e); + } + return connection; + } + }); + } + + private final static Comparator<TimelineMetric> TIME_IGNORING_COMPARATOR = + new Comparator<TimelineMetric>() { + @Override + public int compare(TimelineMetric o1, TimelineMetric o2) { + return o1.equalsExceptTime(o2) ? 0 : 1; + } + }; + + private TimelineMetrics prepareTimelineMetrics(long startTime, String host) { + TimelineMetrics metrics = new TimelineMetrics(); + metrics.setMetrics(Arrays.asList( + createMetric(startTime, "disk_free", host), + createMetric(startTime, "mem_free", host))); + + return metrics; + } + + private TimelineMetric createMetric(long startTime, + String metricName, + String host) { + TimelineMetric m = new TimelineMetric(); + m.setAppId("host"); + m.setHostName(host); + m.setMetricName(metricName); + m.setStartTime(startTime); + Map<Long, Double> vals = new HashMap<Long, Double>(); + vals.put(startTime + 15000l, 0.0); + vals.put(startTime + 30000l, 0.0); + vals.put(startTime + 45000l, 1.0); + vals.put(startTime + 60000l, 2.0); + + m.setMetricValues(vals); + + return m; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java new file mode 100644 index 0000000..0722ccd --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + + +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +import static org.junit.runners.Suite.SuiteClasses; + +@RunWith(Suite.class) +@SuiteClasses({ITMetricAggregator.class, ITClusterAggregator.class}) +public class TestClusterSuite { + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebApp.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebApp.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebApp.java new file mode 100644 index 0000000..5f23f83 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebApp.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp; + +import static org.apache.hadoop.yarn.webapp.Params.TITLE; +import static org.mockito.Mockito.mock; +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.ApplicationContext; +import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManager; +import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManagerImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore; +import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStoreTestUtils; +import org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore; +import org.apache.hadoop.yarn.util.StringHelper; +import org.apache.hadoop.yarn.webapp.YarnWebParams; +import org.apache.hadoop.yarn.webapp.test.WebAppTests; +import org.junit.Before; +import org.junit.Test; + +import com.google.inject.Injector; + +import javax.servlet.http.HttpServletResponse; + +public class TestAHSWebApp extends ApplicationHistoryStoreTestUtils { + + public void setApplicationHistoryStore(ApplicationHistoryStore store) { + this.store = store; + } + + @Before + public void setup() { + store = new MemoryApplicationHistoryStore(); + } + + @Test + public void testAppControllerIndex() throws Exception { + ApplicationHistoryManager ahManager = mock(ApplicationHistoryManager.class); + Injector injector = + WebAppTests.createMockInjector(ApplicationHistoryManager.class, + ahManager); + AHSController controller = injector.getInstance(AHSController.class); + controller.index(); + Assert.assertEquals("Application History", controller.get(TITLE, "unknown")); + } + + @Test + public void testView() throws Exception { + Injector injector = + WebAppTests.createMockInjector(ApplicationContext.class, + mockApplicationHistoryManager(5, 1, 1)); + AHSView ahsViewInstance = injector.getInstance(AHSView.class); + + ahsViewInstance.render(); + //WebAppTests.flushOutput(injector); + + ahsViewInstance.set(YarnWebParams.APP_STATE, + YarnApplicationState.FAILED.toString()); + ahsViewInstance.render(); + //WebAppTests.flushOutput(injector); + + ahsViewInstance.set(YarnWebParams.APP_STATE, StringHelper.cjoin( + YarnApplicationState.FAILED.toString(), YarnApplicationState.KILLED)); + ahsViewInstance.render(); + //WebAppTests.flushOutput(injector); + } + + @Test + public void testAppPage() throws Exception { + Injector injector = + WebAppTests.createMockInjector(ApplicationContext.class, + mockApplicationHistoryManager(1, 5, 1)); + AppPage appPageInstance = injector.getInstance(AppPage.class); + + appPageInstance.render(); + //WebAppTests.flushOutput(injector); + + appPageInstance.set(YarnWebParams.APPLICATION_ID, ApplicationId + .newInstance(0, 1).toString()); + appPageInstance.render(); + // WebAppTests.flushOutput(injector); + } + + @Test + public void testAppAttemptPage() throws Exception { + Injector injector = + WebAppTests.createMockInjector(ApplicationContext.class, + mockApplicationHistoryManager(1, 1, 5)); + AppAttemptPage appAttemptPageInstance = + injector.getInstance(AppAttemptPage.class); + + appAttemptPageInstance.render(); + //WebAppTests.flushOutput(injector); + + appAttemptPageInstance.set(YarnWebParams.APPLICATION_ATTEMPT_ID, + ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1) + .toString()); + appAttemptPageInstance.render(); + //WebAppTests.flushOutput(injector); + } + + @Test + public void testContainerPage() throws Exception { + Injector injector = + WebAppTests.createMockInjector(ApplicationContext.class, + mockApplicationHistoryManager(1, 1, 1)); + ContainerPage containerPageInstance = + injector.getInstance(ContainerPage.class); + + containerPageInstance.render(); + //WebAppTests.flushOutput(injector); + + containerPageInstance.set( + YarnWebParams.CONTAINER_ID, + ContainerId + .newInstance( + ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1), + 1).toString()); + containerPageInstance.render(); + //WebAppTests.flushOutput(injector); + } + + ApplicationHistoryManager mockApplicationHistoryManager(int numApps, + int numAppAttempts, int numContainers) throws Exception { + ApplicationHistoryManager ahManager = + new MockApplicationHistoryManagerImpl(store); + for (int i = 1; i <= numApps; ++i) { + ApplicationId appId = ApplicationId.newInstance(0, i); + writeApplicationStartData(appId); + for (int j = 1; j <= numAppAttempts; ++j) { + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, j); + writeApplicationAttemptStartData(appAttemptId); + for (int k = 1; k <= numContainers; ++k) { + ContainerId containerId = ContainerId.newInstance(appAttemptId, k); + writeContainerStartData(containerId); + writeContainerFinishData(containerId); + } + writeApplicationAttemptFinishData(appAttemptId); + } + writeApplicationFinishData(appId); + } + return ahManager; + } + + class MockApplicationHistoryManagerImpl extends ApplicationHistoryManagerImpl { + + public MockApplicationHistoryManagerImpl(ApplicationHistoryStore store) { + super(); + init(new YarnConfiguration()); + start(); + } + + @Override + protected ApplicationHistoryStore createApplicationHistoryStore( + Configuration conf) { + return store; + } + }; + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java new file mode 100644 index 0000000..7ca5a03 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import javax.ws.rs.core.MediaType; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.ApplicationContext; +import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManager; +import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore; +import org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore; +import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; +import org.apache.hadoop.yarn.webapp.WebServicesTestUtils; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.junit.Before; +import org.junit.Test; + +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.servlet.GuiceServletContextListener; +import com.google.inject.servlet.ServletModule; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.ClientResponse.Status; +import com.sun.jersey.api.client.UniformInterfaceException; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; +import com.sun.jersey.test.framework.JerseyTest; +import com.sun.jersey.test.framework.WebAppDescriptor; + +public class TestAHSWebServices extends JerseyTest { + + private static ApplicationHistoryManager ahManager; + + private Injector injector = Guice.createInjector(new ServletModule() { + + @Override + protected void configureServlets() { + bind(JAXBContextResolver.class); + bind(AHSWebServices.class); + bind(GenericExceptionHandler.class); + try { + ahManager = mockApplicationHistoryManager(); + } catch (Exception e) { + Assert.fail(); + } + bind(ApplicationContext.class).toInstance(ahManager); + serve("/*").with(GuiceContainer.class); + } + }); + + public class GuiceServletConfig extends GuiceServletContextListener { + + @Override + protected Injector getInjector() { + return injector; + } + } + + private ApplicationHistoryManager mockApplicationHistoryManager() + throws Exception { + ApplicationHistoryStore store = new MemoryApplicationHistoryStore(); + TestAHSWebApp testAHSWebApp = new TestAHSWebApp(); + testAHSWebApp.setApplicationHistoryStore(store); + ApplicationHistoryManager ahManager = + testAHSWebApp.mockApplicationHistoryManager(5, 5, 5); + return ahManager; + } + + public TestAHSWebServices() { + super(new WebAppDescriptor.Builder( + "org.apache.hadoop.yarn.server.applicationhistoryservice.webapp") + .contextListenerClass(GuiceServletConfig.class) + .filterClass(com.google.inject.servlet.GuiceFilter.class) + .contextPath("jersey-guice-filter").servletPath("/").build()); + } + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + } + + @Test + public void testInvalidUri() throws JSONException, Exception { + WebResource r = resource(); + String responseStr = ""; + try { + responseStr = + r.path("ws").path("v1").path("applicationhistory").path("bogus") + .accept(MediaType.APPLICATION_JSON).get(String.class); + fail("should have thrown exception on invalid uri"); + } catch (UniformInterfaceException ue) { + ClientResponse response = ue.getResponse(); + assertEquals(Status.NOT_FOUND, response.getClientResponseStatus()); + + WebServicesTestUtils.checkStringMatch( + "error string exists and shouldn't", "", responseStr); + } + } + + @Test + public void testInvalidUri2() throws JSONException, Exception { + WebResource r = resource(); + String responseStr = ""; + try { + responseStr = r.accept(MediaType.APPLICATION_JSON).get(String.class); + fail("should have thrown exception on invalid uri"); + } catch (UniformInterfaceException ue) { + ClientResponse response = ue.getResponse(); + assertEquals(Status.NOT_FOUND, response.getClientResponseStatus()); + WebServicesTestUtils.checkStringMatch( + "error string exists and shouldn't", "", responseStr); + } + } + + @Test + public void testInvalidAccept() throws JSONException, Exception { + WebResource r = resource(); + String responseStr = ""; + try { + responseStr = + r.path("ws").path("v1").path("applicationhistory") + .accept(MediaType.TEXT_PLAIN).get(String.class); + fail("should have thrown exception on invalid uri"); + } catch (UniformInterfaceException ue) { + ClientResponse response = ue.getResponse(); + assertEquals(Status.INTERNAL_SERVER_ERROR, + response.getClientResponseStatus()); + WebServicesTestUtils.checkStringMatch( + "error string exists and shouldn't", "", responseStr); + } + } + + @Test + public void testAppsQuery() throws Exception { + WebResource r = resource(); + ClientResponse response = + r.path("ws").path("v1").path("applicationhistory").path("apps") + .queryParam("state", YarnApplicationState.FINISHED.toString()) + .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + assertEquals("incorrect number of elements", 1, json.length()); + JSONObject apps = json.getJSONObject("apps"); + assertEquals("incorrect number of elements", 1, apps.length()); + JSONArray array = apps.getJSONArray("app"); + assertEquals("incorrect number of elements", 5, array.length()); + } + + @Test + public void testSingleApp() throws Exception { + ApplicationId appId = ApplicationId.newInstance(0, 1); + WebResource r = resource(); + ClientResponse response = + r.path("ws").path("v1").path("applicationhistory").path("apps") + .path(appId.toString()).accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + assertEquals("incorrect number of elements", 1, json.length()); + JSONObject app = json.getJSONObject("app"); + assertEquals(appId.toString(), app.getString("appId")); + assertEquals(appId.toString(), app.get("name")); + assertEquals(appId.toString(), app.get("diagnosticsInfo")); + assertEquals("test queue", app.get("queue")); + assertEquals("test user", app.get("user")); + assertEquals("test type", app.get("type")); + assertEquals(FinalApplicationStatus.UNDEFINED.toString(), + app.get("finalAppStatus")); + assertEquals(YarnApplicationState.FINISHED.toString(), app.get("appState")); + } + + @Test + public void testMultipleAttempts() throws Exception { + ApplicationId appId = ApplicationId.newInstance(0, 1); + WebResource r = resource(); + ClientResponse response = + r.path("ws").path("v1").path("applicationhistory").path("apps") + .path(appId.toString()).path("appattempts") + .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + assertEquals("incorrect number of elements", 1, json.length()); + JSONObject appAttempts = json.getJSONObject("appAttempts"); + assertEquals("incorrect number of elements", 1, appAttempts.length()); + JSONArray array = appAttempts.getJSONArray("appAttempt"); + assertEquals("incorrect number of elements", 5, array.length()); + } + + @Test + public void testSingleAttempt() throws Exception { + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + WebResource r = resource(); + ClientResponse response = + r.path("ws").path("v1").path("applicationhistory").path("apps") + .path(appId.toString()).path("appattempts") + .path(appAttemptId.toString()).accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + assertEquals("incorrect number of elements", 1, json.length()); + JSONObject appAttempt = json.getJSONObject("appAttempt"); + assertEquals(appAttemptId.toString(), appAttempt.getString("appAttemptId")); + assertEquals(appAttemptId.toString(), appAttempt.getString("host")); + assertEquals(appAttemptId.toString(), + appAttempt.getString("diagnosticsInfo")); + assertEquals("test tracking url", appAttempt.getString("trackingUrl")); + assertEquals(YarnApplicationAttemptState.FINISHED.toString(), + appAttempt.get("appAttemptState")); + } + + @Test + public void testMultipleContainers() throws Exception { + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + WebResource r = resource(); + ClientResponse response = + r.path("ws").path("v1").path("applicationhistory").path("apps") + .path(appId.toString()).path("appattempts") + .path(appAttemptId.toString()).path("containers") + .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + assertEquals("incorrect number of elements", 1, json.length()); + JSONObject containers = json.getJSONObject("containers"); + assertEquals("incorrect number of elements", 1, containers.length()); + JSONArray array = containers.getJSONArray("container"); + assertEquals("incorrect number of elements", 5, array.length()); + } + + @Test + public void testSingleContainer() throws Exception { + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId containerId = ContainerId.newInstance(appAttemptId, 1); + WebResource r = resource(); + ClientResponse response = + r.path("ws").path("v1").path("applicationhistory").path("apps") + .path(appId.toString()).path("appattempts") + .path(appAttemptId.toString()).path("containers") + .path(containerId.toString()).accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + assertEquals("incorrect number of elements", 1, json.length()); + JSONObject container = json.getJSONObject("container"); + assertEquals(containerId.toString(), container.getString("containerId")); + assertEquals(containerId.toString(), container.getString("diagnosticsInfo")); + assertEquals("0", container.getString("allocatedMB")); + assertEquals("0", container.getString("allocatedVCores")); + assertEquals(NodeId.newInstance("localhost", 0).toString(), + container.getString("assignedNodeId")); + assertEquals(Priority.newInstance(containerId.getId()).toString(), + container.getString("priority")); + Configuration conf = new YarnConfiguration(); + assertEquals(WebAppUtils.getHttpSchemePrefix(conf) + + WebAppUtils.getAHSWebAppURLWithoutScheme(conf) + + "/applicationhistory/logs/localhost:0/container_0_0001_01_000001/" + + "container_0_0001_01_000001/test user", + container.getString("logUrl")); + assertEquals(ContainerState.COMPLETE.toString(), + container.getString("containerState")); + } + +}