http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java new file mode 100644 index 0000000..f2cad37 --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.topology.extractor.hdfs; + +import org.apache.eagle.app.utils.PathResolverHelper; +import org.apache.eagle.topology.TopologyCheckAppConfig; +import org.apache.eagle.topology.TopologyConstants; +import org.apache.eagle.topology.extractor.TopologyEntityParserResult; +import org.apache.eagle.topology.entity.HdfsServiceTopologyAPIEntity; +import org.apache.eagle.topology.extractor.TopologyEntityParser; +import org.apache.eagle.topology.resolver.TopologyRackResolver; +import org.apache.eagle.topology.utils.*; + +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.json.JSONTokener; +import org.slf4j.Logger; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.eagle.topology.TopologyConstants.*; +import static org.apache.eagle.topology.TopologyConstants.RACK_TAG; + +public class HdfsTopologyEntityParser implements TopologyEntityParser { + + private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(HdfsTopologyEntityParser.class); + private String [] namenodeUrls; + private String site; + private TopologyRackResolver rackResolver; + + private static final String JMX_URL = "/jmx?anonymous=true"; + private static final String JMX_FS_NAME_SYSTEM_BEAN_NAME = "Hadoop:service=NameNode,name=FSNamesystem"; + private static final String JMX_NAMENODE_INFO = "Hadoop:service=NameNode,name=NameNodeInfo"; + + private static final String HA_STATE = "tag.HAState"; + private static final String HA_NAME = "tag.Hostname"; + private static final String CAPACITY_TOTAL_GB = "CapacityTotalGB"; + private static final String CAPACITY_USED_GB = "CapacityUsedGB"; + private static final String BLOCKS_TOTAL = "BlocksTotal"; + private static final String LIVE_NODES = "LiveNodes"; + private static final String DEAD_NODES = "DeadNodes"; + + private static final String JN_STATUS = "NameJournalStatus"; + private static final String JN_TRANSACTION_INFO = "JournalTransactionInfo"; + private static final String LAST_TX_ID = "LastAppliedOrWrittenTxId"; + + private static final String DATA_NODE_NUM_BLOCKS = "numBlocks"; + private static final String DATA_NODE_USED_SPACE = "usedSpace"; + private static final String DATA_NODE_CAPACITY = "capacity"; + private static final String DATA_NODE_ADMIN_STATE = "adminState"; + private static final String DATA_NODE_FAILED_VOLUMN = "volfails"; + + private static final String DATA_NODE_DECOMMISSIONED = "Decommissioned"; + private static final String DATA_NODE_DECOMMISSIONED_STATE = "decommissioned"; + + private static final String STATUS_PATTERN = "([\\d\\.]+):\\d+\\s+\\([\\D]+(\\d+)\\)"; + private static final String QJM_PATTERN = "([\\d\\.]+):\\d+"; + + public HdfsTopologyEntityParser(String site, TopologyCheckAppConfig.HdfsConfig hdfsConfig, TopologyRackResolver rackResolver) { + this.namenodeUrls = hdfsConfig.namenodeUrls; + this.site = site; + this.rackResolver = rackResolver; + } + + @Override + public TopologyEntityParserResult parse(long timestamp) throws IOException { + final TopologyEntityParserResult result = new TopologyEntityParserResult(); + result.setVersion(TopologyConstants.HadoopVersion.V2); + int numNamenode = 0; + for (String url : namenodeUrls) { + try { + final HdfsServiceTopologyAPIEntity namenodeEntity = createNamenodeEntity(url, timestamp); + result.getMasterNodes().add(namenodeEntity); + numNamenode++; + if (namenodeEntity.getStatus().equalsIgnoreCase(NAME_NODE_ACTIVE_STATUS)) { + createSlaveNodeEntities(url, timestamp, result); + } + } catch (RuntimeException ex) { + ex.printStackTrace(); + } catch (IOException e) { + LOG.warn("Catch an IOException with url: {}", url); + } + } + double value = numNamenode * 1d / namenodeUrls.length; + result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.NAME_NODE_ROLE, value, site, timestamp)); + return result; + } + + private HdfsServiceTopologyAPIEntity createNamenodeEntity(String url, long updateTime) throws JSONException, IOException { + final String urlString = buildFSNamesystemURL(url); + final Map<String, JMXBean> jmxBeanMap = JMXQueryHelper.query(urlString); + final JMXBean bean = jmxBeanMap.get(JMX_FS_NAME_SYSTEM_BEAN_NAME); + if (bean == null || bean.getPropertyMap() == null) { + throw new ServiceNotResponseException("Invalid JMX format, FSNamesystem bean is null!"); + } + final String hostname = (String)bean.getPropertyMap().get(HA_NAME); + HdfsServiceTopologyAPIEntity result = createHdfsServiceEntity(TopologyConstants.NAME_NODE_ROLE, hostname, updateTime); + final String state = (String)bean.getPropertyMap().get(HA_STATE); + result.setStatus(state); + final Double configuredCapacityGB = (Double) bean.getPropertyMap().get(CAPACITY_TOTAL_GB); + result.setConfiguredCapacityTB(Double.toString(configuredCapacityGB / 1024)); + final Double capacityUsedGB = (Double) bean.getPropertyMap().get(CAPACITY_USED_GB); + result.setUsedCapacityTB(Double.toString(capacityUsedGB / 1024)); + final Integer blocksTotal = (Integer) bean.getPropertyMap().get(BLOCKS_TOTAL); + result.setNumBlocks(Integer.toString(blocksTotal)); + return result; + } + + private void createSlaveNodeEntities(String url, long updateTime, TopologyEntityParserResult result) throws IOException { + final String urlString = buildNamenodeInfo(url); + final Map<String, JMXBean> jmxBeanMap = JMXQueryHelper.query(urlString); + final JMXBean bean = jmxBeanMap.get(JMX_NAMENODE_INFO); + if (bean == null || bean.getPropertyMap() == null) { + throw new ServiceNotResponseException("Invalid JMX format, NameNodeInfo bean is null!"); + } + createAllDataNodeEntities(bean, updateTime, result); + createAllJournalNodeEntities(bean, updateTime, result); + } + + private void createAllJournalNodeEntities(JMXBean bean, long updateTime, TopologyEntityParserResult result) throws UnknownHostException { + if (bean.getPropertyMap().get(JN_TRANSACTION_INFO) == null || bean.getPropertyMap().get(JN_STATUS) == null) { + return; + } + String jnInfoString = (String) bean.getPropertyMap().get(JN_TRANSACTION_INFO); + JSONObject jsonObject = new JSONObject(jnInfoString); + long lastTxId = Long.parseLong(jsonObject.getString(LAST_TX_ID)); + + String journalnodeString = (String) bean.getPropertyMap().get(JN_STATUS); + JSONArray jsonArray = new JSONArray(journalnodeString); + JSONObject jsonMap = (JSONObject) jsonArray.get(0); + + Map<String, HdfsServiceTopologyAPIEntity> journalNodesMap = new HashMap<>(); + String QJM = jsonMap.getString("manager"); + Pattern qjm = Pattern.compile(QJM_PATTERN); + Matcher jpmMatcher = qjm.matcher(QJM); + while (jpmMatcher.find()) { + String ip = jpmMatcher.group(1); + String hostname = EntityBuilderHelper.resolveHostByIp(ip); + HdfsServiceTopologyAPIEntity entity = createHdfsServiceEntity(TopologyConstants.JOURNAL_NODE_ROLE, hostname, updateTime); + entity.setStatus(TopologyConstants.DATA_NODE_DEAD_STATUS); + journalNodesMap.put(ip, entity); + } + if (journalNodesMap.isEmpty()) { + LOG.warn("Fail to find journal node info in JMX"); + return; + } + + String stream = jsonMap.getString("stream"); + Pattern status = Pattern.compile(STATUS_PATTERN); + Matcher statusMatcher = status.matcher(stream); + long numLiveJournalNodes = 0; + while (statusMatcher.find()) { + numLiveJournalNodes++; + String ip = statusMatcher.group(1); + if (journalNodesMap.containsKey(ip)) { + long txid = Long.parseLong(statusMatcher.group(2)); + journalNodesMap.get(ip).setWrittenTxidDiff(lastTxId - txid); + journalNodesMap.get(ip).setStatus(TopologyConstants.DATA_NODE_LIVE_STATUS); + } + } + result.getMasterNodes().addAll(journalNodesMap.values()); + + double value = numLiveJournalNodes * 1d / journalNodesMap.size(); + result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.JOURNAL_NODE_ROLE, value, site, updateTime)); + } + + private void createAllDataNodeEntities(JMXBean bean, long updateTime, TopologyEntityParserResult result) throws JSONException, IOException { + int numLiveNodes = 0; + int numLiveDecommNodes = 0; + int numDeadNodes = 0; + int numDeadDecommNodes = 0; + + String deadNodesStrings = (String) bean.getPropertyMap().get(DEAD_NODES); + JSONTokener tokener = new JSONTokener(deadNodesStrings); + JSONObject jsonNodesObject = new JSONObject(tokener); + final JSONArray deadNodes = jsonNodesObject.names(); + for (int i = 0; deadNodes != null && i < deadNodes.length(); ++i) { + final String hostname = deadNodes.getString(i); + final JSONObject deadNode = jsonNodesObject.getJSONObject(hostname); + HdfsServiceTopologyAPIEntity entity = createHdfsServiceEntity(TopologyConstants.DATA_NODE_ROLE, hostname, updateTime); + if (deadNode.getBoolean(DATA_NODE_DECOMMISSIONED_STATE)) { + ++numDeadDecommNodes; + entity.setStatus(TopologyConstants.DATA_NODE_DEAD_DECOMMISSIONED_STATUS); + } else { + entity.setStatus(TopologyConstants.DATA_NODE_DEAD_STATUS); + } + ++numDeadNodes; + result.getSlaveNodes().add(entity); + } + LOG.info("Dead nodes " + numDeadNodes + ", dead but decommissioned nodes: " + numDeadDecommNodes); + + String liveNodesStrings = (String) bean.getPropertyMap().get(LIVE_NODES); + tokener = new JSONTokener(liveNodesStrings); + jsonNodesObject = new JSONObject(tokener); + final JSONArray liveNodes = jsonNodesObject.names(); + for (int i = 0; liveNodes != null && i < liveNodes.length(); ++i) { + final String hostname = liveNodes.getString(i); + final JSONObject liveNode = jsonNodesObject.getJSONObject(hostname); + + HdfsServiceTopologyAPIEntity entity = createHdfsServiceEntity(TopologyConstants.DATA_NODE_ROLE, hostname, updateTime); + final Number configuredCapacity = (Number) liveNode.get(DATA_NODE_CAPACITY); + entity.setConfiguredCapacityTB(Double.toString(configuredCapacity.doubleValue() / 1024.0 / 1024.0 / 1024.0 / 1024.0)); + final Number capacityUsed = (Number) liveNode.get(DATA_NODE_USED_SPACE); + entity.setUsedCapacityTB(Double.toString(capacityUsed.doubleValue() / 1024.0 / 1024.0 / 1024.0 / 1024.0)); + final Number blocksTotal = (Number) liveNode.get(DATA_NODE_NUM_BLOCKS); + entity.setNumBlocks(Double.toString(blocksTotal.doubleValue())); + if (liveNode.has(DATA_NODE_FAILED_VOLUMN)) { + final Number volFails = (Number) liveNode.get(DATA_NODE_FAILED_VOLUMN); + entity.setNumFailedVolumes(Double.toString(volFails.doubleValue())); + } + final String adminState = liveNode.getString(DATA_NODE_ADMIN_STATE); + if (DATA_NODE_DECOMMISSIONED.equalsIgnoreCase(adminState)) { + ++numLiveDecommNodes; + entity.setStatus(TopologyConstants.DATA_NODE_LIVE_DECOMMISSIONED_STATUS); + } else { + entity.setStatus(TopologyConstants.DATA_NODE_LIVE_STATUS); + } + numLiveNodes++; + result.getSlaveNodes().add(entity); + } + LOG.info("Live nodes " + numLiveNodes + ", live but decommissioned nodes: " + numLiveDecommNodes); + + double value = numLiveNodes * 1.0d / result.getSlaveNodes().size(); + result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.DATA_NODE_ROLE, value, site, updateTime)); + } + + private HdfsServiceTopologyAPIEntity createHdfsServiceEntity(String roleType, String hostname, long updateTime) { + HdfsServiceTopologyAPIEntity entity = new HdfsServiceTopologyAPIEntity(); + entity.setTimestamp(updateTime); + Map<String, String> tags = new HashMap<String, String>(); + entity.setTags(tags); + tags.put(SITE_TAG, site); + tags.put(ROLE_TAG, roleType); + tags.put(HOSTNAME_TAG, hostname); + String rack = rackResolver.resolve(hostname); + tags.put(RACK_TAG, rack); + return entity; + } + + private String buildFSNamesystemURL(String url) { + return PathResolverHelper.buildUrlPath(url, JMX_URL + "&qry=" + JMX_FS_NAME_SYSTEM_BEAN_NAME); + } + + private String buildNamenodeInfo(String url) { + return PathResolverHelper.buildUrlPath(url, JMX_URL + "&qry=" + JMX_NAMENODE_INFO); + } + + @Override + public TopologyConstants.TopologyType getTopologyType() { + return TopologyConstants.TopologyType.HDFS; + } + + @Override + public TopologyConstants.HadoopVersion getHadoopVersion() { + return TopologyConstants.HadoopVersion.V2; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyCrawler.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyCrawler.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyCrawler.java new file mode 100644 index 0000000..af6bd51 --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyCrawler.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.topology.extractor.mr; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.tuple.Values; +import org.apache.eagle.topology.TopologyCheckAppConfig; +import org.apache.eagle.topology.TopologyCheckMessageId; +import org.apache.eagle.topology.TopologyConstants; +import org.apache.eagle.topology.extractor.TopologyEntityParserResult; +import org.apache.eagle.topology.extractor.TopologyCrawler; +import org.apache.eagle.topology.resolver.TopologyRackResolver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MRTopologyCrawler implements TopologyCrawler { + + private static final Logger LOG = LoggerFactory.getLogger(MRTopologyCrawler.class); + + private MRTopologyEntityParser parser; + private SpoutOutputCollector outputCollector; + + public MRTopologyCrawler(TopologyCheckAppConfig config, TopologyRackResolver rackResolver, SpoutOutputCollector collector) { + this.parser = new MRTopologyEntityParser(config.dataExtractorConfig.site, config.mrConfig, rackResolver); + this.outputCollector = collector; + } + + @Override + public void extract() { + long updateTimestamp = System.currentTimeMillis(); + TopologyEntityParserResult result = parser.parse(updateTimestamp); + if (result == null || result.getMasterNodes().isEmpty()) { + LOG.warn("No data fetched"); + return; + } + TopologyCheckMessageId messageId = new TopologyCheckMessageId(TopologyConstants.TopologyType.MR, updateTimestamp); + this.outputCollector.emit(new Values(TopologyConstants.MR_INSTANCE_SERVICE_NAME, result), messageId); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java new file mode 100644 index 0000000..455b1d0 --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.topology.extractor.mr; + +import org.apache.eagle.app.utils.AppConstants; +import org.apache.eagle.app.utils.PathResolverHelper; +import org.apache.eagle.app.utils.connection.InputStreamUtils; +import org.apache.eagle.topology.TopologyCheckAppConfig; +import org.apache.eagle.topology.TopologyConstants; +import org.apache.eagle.topology.extractor.TopologyEntityParserResult; +import org.apache.eagle.topology.entity.MRServiceTopologyAPIEntity; +import org.apache.eagle.topology.extractor.TopologyEntityParser; +import org.apache.eagle.topology.resolver.TopologyRackResolver; +import org.apache.eagle.topology.utils.EntityBuilderHelper; +import org.apache.eagle.topology.utils.ServiceNotResponseException; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.net.ConnectException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; + +import static org.apache.eagle.topology.TopologyConstants.*; + +public class MRTopologyEntityParser implements TopologyEntityParser { + + private String [] rmUrls; + private String historyServerUrl; + private String site; + private TopologyRackResolver rackResolver; + + private static final String YARN_NODES_URL = "/ws/v1/cluster/nodes?anonymous=true"; + private static final String YARN_HISTORY_SERVER_URL = "/ws/v1/history/info"; + + private static final Logger LOGGER = LoggerFactory.getLogger(MRTopologyEntityParser.class); + private static final ObjectMapper OBJ_MAPPER = new ObjectMapper(); + + static { + OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true); + } + + public MRTopologyEntityParser(String site, TopologyCheckAppConfig.MRConfig config, TopologyRackResolver rackResolver) { + this.site = site; + this.rmUrls = config.rmUrls; + this.historyServerUrl = config.historyServerUrl; + this.rackResolver = rackResolver; + } + + @Override + public TopologyConstants.HadoopVersion getHadoopVersion() { + return TopologyConstants.HadoopVersion.V2; + } + + @Override + public TopologyConstants.TopologyType getTopologyType() { + return TopologyConstants.TopologyType.MR; + } + + @Override + public TopologyEntityParserResult parse(long timestamp) { + final TopologyEntityParserResult result = new TopologyEntityParserResult(); + result.setVersion(TopologyConstants.HadoopVersion.V2); + + for (String url : rmUrls) { + try { + doParse(PathResolverHelper.buildUrlPath(url, YARN_NODES_URL), timestamp, result); + } catch (ServiceNotResponseException ex) { + LOGGER.warn("Catch a ServiceNotResponseException with url: {}", url); + // reSelect url + } + } + if (result.getMasterNodes().isEmpty()) { + result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.RESOURCE_MANAGER_ROLE, 0, site, timestamp)); + } + doCheckHistoryServer(timestamp, result); + return result; + } + + private void doCheckHistoryServer(long updateTime, TopologyEntityParserResult result) { + if (historyServerUrl == null || historyServerUrl.isEmpty()) { + return; + } + String hsUrl = PathResolverHelper.buildUrlPath(historyServerUrl, YARN_HISTORY_SERVER_URL); + double liveCount = 1; + try { + InputStreamUtils.getInputStream(hsUrl, null, AppConstants.CompressionType.NONE); + } catch (ConnectException e) { + liveCount = 0; + } catch (Exception e) { + e.printStackTrace(); + return; + } + result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.HISTORY_SERVER_ROLE, liveCount, site, updateTime)); + } + + private InputStream getInputStream(String url, AppConstants.CompressionType type) throws ServiceNotResponseException { + InputStream is = null; + try { + is = InputStreamUtils.getInputStream(url, null, type); + } catch (ConnectException e) { + throw new ServiceNotResponseException(e); + } catch (Exception e) { + e.printStackTrace(); + } + return is; + } + + private void doParse(String url, long timestamp, TopologyEntityParserResult result) throws ServiceNotResponseException { + + InputStream is = null; + try { + LOGGER.info("Going to query URL: " + url); + is = InputStreamUtils.getInputStream(url, null, AppConstants.CompressionType.NONE); + YarnNodeInfoWrapper nodeWrapper = OBJ_MAPPER.readValue(is, YarnNodeInfoWrapper.class); + if (nodeWrapper.getNodes() == null || nodeWrapper.getNodes().getNode() == null) { + throw new ServiceNotResponseException("Invalid result of URL: " + url); + } + int runningNodeCount = 0; + int lostNodeCount = 0; + int unhealthyNodeCount = 0; + final List<YarnNodeInfo> list = nodeWrapper.getNodes().getNode(); + for (YarnNodeInfo info : list) { + final MRServiceTopologyAPIEntity nodeManagerEntity = createEntity(NODE_MANAGER_ROLE, info.getNodeHostName(), timestamp); + if (info.getHealthReport() != null && (!info.getHealthReport().isEmpty())) { + nodeManagerEntity.setHealthReport(info.getHealthReport()); + } + // TODO: Need to remove the manually mapping RUNNING -> running, LOST - > lost, UNHEALTHY -> unhealthy + if (info.getState() != null) { + final String state = info.getState().toLowerCase(); + nodeManagerEntity.setStatus(state); + if (state.equals(TopologyConstants.NODE_MANAGER_RUNNING_STATUS)) { + ++runningNodeCount; + } else if (state.equals(TopologyConstants.NODE_MANAGER_LOST_STATUS)) { + ++lostNodeCount; + } else if (state.equals(TopologyConstants.NODE_MANAGER_UNHEALTHY_STATUS)) { + ++unhealthyNodeCount; + } + } + result.getSlaveNodes().add(nodeManagerEntity); + } + LOGGER.info("Running NMs: " + runningNodeCount + ", lost NMs: " + lostNodeCount + ", unhealthy NMs: " + unhealthyNodeCount); + final MRServiceTopologyAPIEntity resourceManagerEntity = createEntity(TopologyConstants.RESOURCE_MANAGER_ROLE, extractMasterHost(url), timestamp); + resourceManagerEntity.setStatus(TopologyConstants.RESOURCE_MANAGER_ACTIVE_STATUS); + result.getMasterNodes().add(resourceManagerEntity); + double value = runningNodeCount * 1d / list.size(); + result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.NODE_MANAGER_ROLE, value, site, timestamp)); + } catch (RuntimeException e) { + e.printStackTrace(); + } catch (IOException e) { + throw new ServiceNotResponseException(e); + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (is != null) { + try { + is.close(); + } catch (IOException e) { + e.printStackTrace(); + // Do nothing + } + } + } + } + + private String extractMasterHost(String url) { + Matcher matcher = TopologyConstants.HTTP_HOST_MATCH_PATTERN.matcher(url); + if (matcher.find()) { + return matcher.group(1); + } + return url; + } + + private String extractRack(YarnNodeInfo info) { + if (info.getRack() == null) { + return null; + } + String value = info.getRack(); + value = value.substring(value.lastIndexOf('/') + 1); + return value; + } + + private MRServiceTopologyAPIEntity createEntity(String roleType, String hostname, long updateTime) { + MRServiceTopologyAPIEntity entity = new MRServiceTopologyAPIEntity(); + entity.setLastUpdateTime(updateTime); + Map<String, String> tags = new HashMap<String, String>(); + entity.setTags(tags); + tags.put(SITE_TAG, site); + tags.put(ROLE_TAG, roleType); + tags.put(HOSTNAME_TAG, hostname); + String rack = rackResolver.resolve(hostname); + tags.put(RACK_TAG, rack); + return entity; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java new file mode 100644 index 0000000..9315f78 --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.topology.extractor.mr; + +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class YarnNodeInfo { + + private String rack; + private String state; + private String id; + private String nodeHostName; + private String nodeHTTPAddress; + private String lastHealthUpdate; + private String healthReport; + private String numContainers; + private String usedMemoryMB; + private String availMemoryMB; + + + public String getRack() { + return rack; + } + public void setRack(String rack) { + this.rack = rack; + } + public String getState() { + return state; + } + public void setState(String state) { + this.state = state; + } + public String getId() { + return id; + } + public void setId(String id) { + this.id = id; + } + public String getNodeHostName() { + return nodeHostName; + } + public void setNodeHostName(String nodeHostName) { + this.nodeHostName = nodeHostName; + } + public String getNodeHTTPAddress() { + return nodeHTTPAddress; + } + public void setNodeHTTPAddress(String nodeHTTPAddress) { + this.nodeHTTPAddress = nodeHTTPAddress; + } + public String getLastHealthUpdate() { + return lastHealthUpdate; + } + public void setLastHealthUpdate(String lastHealthUpdate) { + this.lastHealthUpdate = lastHealthUpdate; + } + public String getHealthReport() { + return healthReport; + } + public void setHealthReport(String healthReport) { + this.healthReport = healthReport; + } + public String getNumContainers() { + return numContainers; + } + public void setNumContainers(String numContainers) { + this.numContainers = numContainers; + } + public String getUsedMemoryMB() { + return usedMemoryMB; + } + public void setUsedMemoryMB(String usedMemoryMB) { + this.usedMemoryMB = usedMemoryMB; + } + public String getAvailMemoryMB() { + return availMemoryMB; + } + public void setAvailMemoryMB(String availMemoryMB) { + this.availMemoryMB = availMemoryMB; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java new file mode 100644 index 0000000..83d8d7f --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.topology.extractor.mr; + +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class YarnNodeInfoWrapper { + + private YarnNodeInfos infos; + + public YarnNodeInfos getNodes() { + return infos; + } + + public void setNodes(YarnNodeInfos infos) { + this.infos = infos; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java new file mode 100644 index 0000000..d715a1e --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.topology.extractor.mr; + +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +import java.util.List; + + +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class YarnNodeInfos { + + private List<YarnNodeInfo> node; + + public List<YarnNodeInfo> getNode() { + return node; + } + + public void setNode(List<YarnNodeInfo> node) { + this.node = node; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java new file mode 100644 index 0000000..ab7b3cc --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.topology.resolver; + +public interface TopologyRackResolver { + + /** + *resolve rack by hostname + * @return rack name + */ + String resolve(String hostname); +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/DefaultTopologyRackResolver.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/DefaultTopologyRackResolver.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/DefaultTopologyRackResolver.java new file mode 100644 index 0000000..8f0aff8 --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/DefaultTopologyRackResolver.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.topology.resolver.impl; + +import org.apache.eagle.topology.resolver.TopologyRackResolver; + +public class DefaultTopologyRackResolver implements TopologyRackResolver { + + private static final String DEFAULT_RACK_NAME = "default-rack"; + private String rack; + + public DefaultTopologyRackResolver() { + this.rack = DEFAULT_RACK_NAME; + } + + public DefaultTopologyRackResolver(String rack) { + this.rack = rack; + } + + /** + * If topology.script.file.name is unset, then the rack name for all hostnames is default-rack + * @param hostname + * @return rack + */ + @Override + public String resolve(String hostname) { + return rack; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java new file mode 100644 index 0000000..df0f863 --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.topology.resolver.impl; + +import org.apache.eagle.topology.resolver.TopologyRackResolver; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +/** + * resolve rack by hostname + */ +public class IPMaskTopologyRackResolver implements TopologyRackResolver { + + private final int DEFAULT_RACK_POS = 2; + private int rackPos; + + public IPMaskTopologyRackResolver() { + this.rackPos = DEFAULT_RACK_POS; + } + + public IPMaskTopologyRackResolver(int rackPos) { + this.rackPos = (rackPos > 3 || rackPos < 0) ? DEFAULT_RACK_POS : rackPos; + } + + @Override + public String resolve(String hostname) { + String result = null; + try { + InetAddress address = InetAddress.getByName(hostname); + result = "rack" + (int)(address.getAddress()[rackPos] & 0xff); + } catch (UnknownHostException e) { + //e.printStackTrace(); + } + return result; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyCheckAppSpout.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyCheckAppSpout.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyCheckAppSpout.java new file mode 100644 index 0000000..f2f3d41 --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyCheckAppSpout.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.topology.storm; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; +import backtype.storm.utils.Utils; +import org.apache.eagle.alert.utils.DateTimeUtil; +import org.apache.eagle.topology.TopologyCheckAppConfig; +import org.apache.eagle.topology.TopologyConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Calendar; +import java.util.Map; + +public class TopologyCheckAppSpout extends BaseRichSpout { + + private TopologyDataExtractor extractor; + private TopologyCheckAppConfig topologyCheckAppConfig; + + private long lastFetchTime; + private long fetchInterval; + + private static final Logger LOG = LoggerFactory.getLogger(TopologyCheckAppSpout.class); + + public TopologyCheckAppSpout(TopologyCheckAppConfig topologyCheckAppConfig) { + this.topologyCheckAppConfig = topologyCheckAppConfig; + this.lastFetchTime = 0; + this.fetchInterval = topologyCheckAppConfig.dataExtractorConfig.fetchDataIntervalInSecs * DateTimeUtil.ONESECOND; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields(TopologyConstants.SERVICE_NAME_FIELD, TopologyConstants.TOPOLOGY_DATA_FIELD)); + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + this.extractor = new TopologyDataExtractor(topologyCheckAppConfig, collector); + } + + @Override + public void nextTuple() { + long currentTime = System.currentTimeMillis(); + Calendar calendar = Calendar.getInstance(); + if (currentTime > lastFetchTime + fetchInterval) { + calendar.setTimeInMillis(this.lastFetchTime); + LOG.info("Last fetch time = {}", calendar.getTime()); + this.extractor.crawl(); + lastFetchTime = currentTime; + } + } + + @Override + public void fail(Object msgId) { + LOG.warn("ack {}", msgId.toString()); + } + + @Override + public void ack(Object msgId) { + LOG.info("ack {}", msgId.toString()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataExtractor.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataExtractor.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataExtractor.java new file mode 100644 index 0000000..32eae9b --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataExtractor.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.topology.storm; + +import backtype.storm.spout.SpoutOutputCollector; +import org.apache.eagle.topology.TopologyCheckAppConfig; +import org.apache.eagle.topology.extractor.TopologyCrawler; +import org.apache.eagle.topology.extractor.TopologyExtractorFactory; +import org.apache.eagle.topology.resolver.TopologyRackResolver; +import org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; + +import static org.apache.eagle.topology.TopologyConstants.*; + +public class TopologyDataExtractor { + + private static final Logger LOGGER = LoggerFactory.getLogger(TopologyDataExtractor.class); + private static final int MIN_WAIT_TIME_SECS = 60; + private static final double FETCH_TIMEOUT_FACTOR = 0.8; + + private TopologyCheckAppConfig config; + private List<TopologyCrawler> extractors; + private ExecutorService executorService; + + public TopologyDataExtractor(TopologyCheckAppConfig topologyCheckAppConfig, SpoutOutputCollector collector) { + this.config = topologyCheckAppConfig; + extractors = getExtractors(collector); + executorService = Executors.newFixedThreadPool(topologyCheckAppConfig.dataExtractorConfig.parseThreadPoolSize); + } + + public void crawl() { + List<Future<?>> futures = new ArrayList<>(); + for (TopologyCrawler topologyExtractor : extractors) { + futures.add(executorService.submit(new DataFetchRunnableWrapper(topologyExtractor))); + } + long fetchTimeoutSecs = (long) Math.max(config.dataExtractorConfig.fetchDataIntervalInSecs * FETCH_TIMEOUT_FACTOR, MIN_WAIT_TIME_SECS); + futures.forEach(future -> { + try { + future.get(fetchTimeoutSecs, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.info("Caught an overtime exception with message" + e.getMessage()); + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } catch (TimeoutException e) { + e.printStackTrace(); + } + }); + } + + private List<TopologyCrawler> getExtractors(SpoutOutputCollector collector) { + List<TopologyCrawler> extractors = new ArrayList<>(); + TopologyRackResolver rackResolver = new DefaultTopologyRackResolver(); + if (config.dataExtractorConfig.resolverCls != null) { + try { + rackResolver = config.dataExtractorConfig.resolverCls.newInstance(); + } catch (InstantiationException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } + } + for (TopologyType type : config.topologyTypes) { + try { + extractors.add(TopologyExtractorFactory.create(type, config, rackResolver, collector)); + } catch (Exception e) { + e.printStackTrace(); + } + } + return extractors; + } + + private static class DataFetchRunnableWrapper implements Runnable { + + private TopologyCrawler topologyExtractor; + + public DataFetchRunnableWrapper(TopologyCrawler topologyExtractor) { + this.topologyExtractor = topologyExtractor; + } + + @Override + public void run() { + topologyExtractor.extract(); + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java new file mode 100644 index 0000000..490f427 --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.topology.storm; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Tuple; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; +import org.apache.eagle.service.client.EagleServiceClientException; +import org.apache.eagle.service.client.EagleServiceConnector; +import org.apache.eagle.service.client.IEagleServiceClient; +import org.apache.eagle.service.client.impl.EagleServiceClientImpl; +import org.apache.eagle.topology.TopologyCheckAppConfig; +import org.apache.eagle.topology.TopologyConstants; +import org.apache.eagle.topology.extractor.TopologyEntityParserResult; +import org.apache.eagle.topology.entity.TopologyBaseAPIEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; + +public class TopologyDataPersistBolt extends BaseRichBolt { + + private TopologyCheckAppConfig config; + private IEagleServiceClient client; + private OutputCollector collector; + + private static final Logger LOG = LoggerFactory.getLogger(TopologyDataPersistBolt.class); + + public TopologyDataPersistBolt(TopologyCheckAppConfig config) { + this.config = config; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.client = new EagleServiceClientImpl(new EagleServiceConnector(this.config.config)); + this.collector = collector; + } + + @Override + public void execute(Tuple input) { + if (input == null) { + return; + } + String serviceName = input.getStringByField(TopologyConstants.SERVICE_NAME_FIELD); + TopologyEntityParserResult result = (TopologyEntityParserResult) input.getValueByField(TopologyConstants.TOPOLOGY_DATA_FIELD); + Set<String> availableHostnames = new HashSet<String>(); + List<TopologyBaseAPIEntity> entitiesForDeletion = new ArrayList<>(); + List<TopologyBaseAPIEntity> entitiesToWrite = new ArrayList<>(); + + filterEntitiesToWrite(entitiesToWrite, availableHostnames, result.getMasterNodes()); + filterEntitiesToWrite(entitiesToWrite, availableHostnames, result.getSlaveNodes()); + + String query = String.format("%s[@site=\"%s\"]{*}", serviceName, this.config.dataExtractorConfig.site); + try { + GenericServiceAPIResponseEntity<TopologyBaseAPIEntity> response = client.search().query(query).pageSize(Integer.MAX_VALUE).send(); + if (response.isSuccess() && response.getObj() != null) { + for (TopologyBaseAPIEntity entity : response.getObj()) { + if (!availableHostnames.contains(generateKey(entity))) { + entitiesForDeletion.add(entity); + } + } + } + deleteEntities(entitiesForDeletion, serviceName); + writeEntities(entitiesToWrite, serviceName); + writeEntities(result.getMetrics(), serviceName); + this.collector.ack(input); + } catch (Exception e) { + e.printStackTrace(); + this.collector.fail(input); + } + } + + private void filterEntitiesToWrite(List<TopologyBaseAPIEntity> entitiesToWrite, Set<String> availableHostnames, List<TopologyBaseAPIEntity> entities) { + for (TopologyBaseAPIEntity entity : entities) { + availableHostnames.add(generateKey(entity)); + entitiesToWrite.add(entity); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } + + private void deleteEntities(List<TopologyBaseAPIEntity> entities, String serviceName) { + try { + GenericServiceAPIResponseEntity response = client.delete(entities); + if (!response.isSuccess()) { + LOG.error("Got exception from eagle service: " + response.getException()); + } else { + LOG.info("Successfully delete {} entities for {}", entities.size(), serviceName); + } + } catch (EagleServiceClientException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + entities.clear(); + } + + private void writeEntities(List<? extends TaggedLogAPIEntity> entities, String serviceName) { + try { + GenericServiceAPIResponseEntity response = client.create(entities); + if (!response.isSuccess()) { + LOG.error("Got exception from eagle service: " + response.getException()); + } else { + LOG.info("Successfully wrote {} entities for {}", entities.size(), serviceName); + } + } catch (Exception e) { + LOG.error("cannot create entities successfully", e); + } + entities.clear(); + } + + private String generateKey(TopologyBaseAPIEntity entity) { + return String.format("%s-%s-%s-%s", entity.getTags().get(TopologyConstants.SITE_TAG), + entity.getTags().get(TopologyConstants.RACK_TAG), entity.getTags().get(TopologyConstants.HOSTNAME_TAG), + entity.getTags().get(TopologyConstants.ROLE_TAG)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java new file mode 100644 index 0000000..55c6183 --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.topology.utils; + +import org.apache.eagle.log.entity.GenericMetricEntity; +import org.apache.eagle.topology.TopologyConstants; +import org.apache.eagle.topology.entity.MRServiceTopologyAPIEntity; +import org.apache.eagle.topology.entity.TopologyBaseAPIEntity; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.eagle.topology.TopologyConstants.*; + +public class EntityBuilderHelper { + + public static String resolveHostByIp(String ip) { + InetAddress addr = null; + try { + addr = InetAddress.getByName(ip); + } catch (UnknownHostException e) { + e.printStackTrace(); + } + return addr.getHostName(); + } + + public static GenericMetricEntity metricWrapper(Long timestamp, String metricName, double value, Map<String, String> tags) { + GenericMetricEntity metricEntity = new GenericMetricEntity(); + metricEntity.setTimestamp(timestamp); + metricEntity.setTags(tags); + metricEntity.setPrefix(metricName); + metricEntity.setValue(new double[]{value}); + return metricEntity; + } + + public static GenericMetricEntity generateMetric(String role, double value, String site, long timestamp) { + Map<String, String> tags = new HashMap<>(); + tags.put(TopologyConstants.SITE_TAG, site); + tags.put(TopologyConstants.ROLE_TAG, role); + String metricName = String.format(TopologyConstants.METRIC_LIVE_RATIO_NAME_FORMAT, role); + return EntityBuilderHelper.metricWrapper(timestamp, metricName, value, tags); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXBean.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXBean.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXBean.java new file mode 100644 index 0000000..d9ac3e0 --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXBean.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.topology.utils; + +import java.util.Map; + +public class JMXBean { + + private Map<String, Object> propertyMap; + + public Map<String, Object> getPropertyMap() { + return propertyMap; + } + + public void setPropertyMap(Map<String, Object> propertyMap) { + this.propertyMap = propertyMap; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java new file mode 100644 index 0000000..fa4c71f --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.topology.utils; + +import org.apache.eagle.app.utils.connection.URLConnectionUtils; + +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.json.JSONTokener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URLConnection; +import java.util.HashMap; +import java.util.Map; + +/** + * Helper class to query Hadoop JMX servlets + */ +public final class JMXQueryHelper { + + private static final int DEFAULT_QUERY_TIMEOUT = 30 * 60 * 1000; + private static final Logger LOG = LoggerFactory.getLogger(JMXQueryHelper.class); + + public static Map<String, JMXBean> query(String jmxQueryUrl) throws JSONException, IOException { + LOG.info("Going to query JMX url: " + jmxQueryUrl); + InputStream is = null; + try { + final URLConnection connection = URLConnectionUtils.getConnection(jmxQueryUrl); + connection.setReadTimeout(DEFAULT_QUERY_TIMEOUT); + is = connection.getInputStream(); + return parseStream(is); + } catch (Exception e) { + e.printStackTrace(); + return null; + } finally { + if (is != null) { + is.close(); + } + } + } + + public static Map<String, JMXBean> parseStream(InputStream is) { + final Map<String, JMXBean> resultMap = new HashMap<String, JMXBean>(); + final JSONTokener tokener = new JSONTokener(is); + final JSONObject jsonBeansObject = new JSONObject(tokener); + final JSONArray jsonArray = jsonBeansObject.getJSONArray("beans"); + int size = jsonArray.length(); + for (int i = 0; i < size; ++i) { + final JSONObject obj = (JSONObject)jsonArray.get(i); + final JMXBean bean = new JMXBean(); + final Map<String, Object> map = new HashMap<String, Object>(); + bean.setPropertyMap(map); + final JSONArray names = obj.names(); + int jsonSize = names.length(); + for (int j = 0 ; j < jsonSize; ++j) { + final String key = names.getString(j); + Object value = obj.get(key); + map.put(key, value); + } + final String nameString = (String) map.get("name"); + resultMap.put(nameString, bean); + } + return resultMap; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java new file mode 100644 index 0000000..48c9133 --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.topology.utils; + +import java.io.IOException; + +public class ServiceNotResponseException extends IOException { + + private static final long serialVersionUID = -2425311876734366496L; + + /** + * Default constructor of FeederException + */ + public ServiceNotResponseException() { + super(); + } + + /** + * Constructor of FeederException + * + * @param message error message + */ + public ServiceNotResponseException(String message) { + super(message); + } + + /** + * Constructor of FeederException + * + * @param message error message + * @param cause the cause of the exception + * + */ + public ServiceNotResponseException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Constructor of FeederException + * + * @param cause the cause of the exception + */ + public ServiceNotResponseException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml new file mode 100644 index 0000000..0a8f1d1 --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml @@ -0,0 +1,155 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ + --> + +<application> + <type>TOPOLOGY_HEALTH_CHECK_APP</type> + <name>Topology Health Check</name> + <version>0.5.0-incubating</version> + <appClass>org.apache.eagle.topology.TopologyCheckApp</appClass> + <viewPath>/apps/jpm</viewPath> + <configuration> + <!-- org.apache.eagle.topology.TopologyCheckApp --> + <property> + <name>dataExtractorConfig.site</name> + <displayName>site</displayName> + <description>Site</description> + <value>sandbox</value> + </property> + <property> + <name>dataExtractorConfig.fetchDataIntervalInSecs</name> + <displayName>FetchDataIntervalInSecs</displayName> + <description>Fetch Data Interval in Secs</description> + <value>300</value> + </property> + <property> + <name>dataExtractorConfig.parseThreadPoolSize</name> + <displayName>parseThreadPoolSize</displayName> + <description>Parser Thread Pool Size</description> + <value>5</value> + </property> + <property> + <name>dataExtractorConfig.numDataFetcherSpout</name> + <displayName>numDataFetcherSpout</displayName> + <description>Spout Task Number</description> + <value>1</value> + </property> + <property> + <name>dataExtractorConfig.numEntityPersistBolt</name> + <displayName>numEntityPersistBolt</displayName> + <description>Bolt Task Number</description> + <value>1</value> + </property> + <property> + <name>dataSourceConfig.hbase.zkQuorum</name> + <displayName>zkQuorum</displayName> + <description>Zookeeper Quorum</description> + <value>sandbox.hortonworks.com:2181</value> + </property> + <property> + <name>dataSourceConfig.hbase.zkZnodeParent</name> + <displayName>zkZnodeParent</displayName> + <description>Hbase Zookeeper Znode Parent Root</description> + <value>/hbase-unsecure</value> + </property> + <property> + <name>dataSourceConfig.hbase.zkPropertyClientPort</name> + <displayName>zkPropertyClientPort</displayName> + <description>Hbase Zookeeper Client Port</description> + <value>2181</value> + </property> + <property> + <name>dataSourceConfig.hbase.kerberos.master.principal</name> + <displayName>hbaseMasterPrincipal</displayName> + <description>Hbase Master Principal</description> + <value>hadoop/_h...@example.com</value> + </property> + <property> + <name>dataSourceConfig.hbase.kerberos.eagle.keytab</name> + <displayName>eagleKeytab</displayName> + <description>Eagle keytab</description> + <value></value> + </property> + <property> + <name>dataSourceConfig.hbase.kerberos.master.principal</name> + <displayName>hbaseMasterPrincipal</displayName> + <description>Hbase Master Principal</description> + <value>hadoop/_h...@example.com</value> + </property> + + <property> + <name>dataSourceConfig.hdfs.namenodeUrl</name> + <displayName>hdfsNamenodeUrl</displayName> + <description>Hdfs Namenode Web URL</description> + <value>http://sandbox.hortonworks.com:50070</value> + </property> + + <property> + <name>dataSourceConfig.mr.rmUrl</name> + <displayName>resourceManagerUrl</displayName> + <description>Resource Manager URL</description> + <value>http://sandbox.hortonworks.com:8088</value> + </property> + + <property> + <name>dataSourceConfig.mr.historyServerUrl</name> + <displayName>historyServerUrl</displayName> + <description>History Server URL</description> + <value></value> + </property> + + <property> + <name>eagleProps.eagleService.host</name> + <description>eagleProps.eagleService.host</description> + <value>localhost</value> + </property> + <property> + <name>eagleProps.eagleService.port</name> + <description>eagleProps.eagleService.port</description> + <value>9090</value> + </property> + <property> + <name>eagleProps.eagleService.username</name> + <description>eagleProps.eagleService.username</description> + <value>admin</value> + </property> + <property> + <name>eagleProps.eagleService.password</name> + <description>eagleProps.eagleService.password</description> + <value>secret</value> + </property> + <property> + <name>eagleProps.eagleService.basePath</name> + <description>eagleProps.eagleService.basePath</description> + <value>/rest</value> + </property> + <property> + <name>eagleProps.eagleService.readTimeOutSeconds</name> + <displayName>eagleProps.eagleService.readTimeOutSeconds</displayName> + <description>The maximum amount of time (in seconds) the app is trying to read from eagle service</description> + <value>2</value> + </property> + + </configuration> + <docs> + <install> + </install> + <uninstall> + </uninstall> + </docs> +</application> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider new file mode 100644 index 0000000..4e08313 --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +org.apache.eagle.topology.TopologyCheckAppProvider \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf new file mode 100644 index 0000000..cbc7ac1 --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +{ + appId : "topologyCheckApp", + mode : "LOCAL", + workers : 1, + + dataExtractorConfig : { + "site": "sandbox", + "fetchDataIntervalInSecs": 300, + "parseThreadPoolSize": 5, + "numDataFetcherSpout" : 1, + "numEntityPersistBolt" : 1, + "rackResolverCls" : "org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver" + } + + dataSourceConfig : { + hdfs.namenodeUrl: "http://sandbox.hortonworks.com:50070", + hbase: { + zkQuorum: "sandbox.hortonworks.com", + zkPropertyClientPort : "2181", + zkZnodeParent: "/hbase-unsecure", + zkRetryTimes : "5", + kerberos : { + master.principal : "hadoop/_h...@example.com" + eagle.principal: "", #if not need, then empty + eagle.keytab: "" + } + }, + mr: { + rmUrl: "http://sandbox.hortonworks.com:8088", + historyServerUrl : "http://sandbox.hortonworks.com:19888" #if not need, then empty + } + } + + eagleProps : { + "mailHost" : "abc.com", + "mailDebug" : "true", + eagleService.host:"localhost", + eagleService.port: 9090, + eagleService.username: "admin", + eagleService.password : "secret", + eagleService.basePath : "/rest", + eagleService.readTimeOutSeconds : 20, + eagleService.maxFlushNum : 500 + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/log4j.properties b/eagle-topology-check/eagle-topology-app/src/main/resources/log4j.properties new file mode 100644 index 0000000..6b8c8d6 --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/resources/log4j.properties @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, stdout, DRFA + +eagle.log.dir=../logs +eagle.log.file=eagle.log + +# standard output +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n + +# Daily Rolling File Appender + log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender + log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file} + log4j.appender.DRFA.DatePattern=.yyyy-MM-dd +## 30-day backup +# log4j.appender.DRFA.MaxBackupIndex=30 + log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout + +# Pattern format: Date LogLevel LoggerName LogMessage +log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHbaseTopologyCrawler.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHbaseTopologyCrawler.java b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHbaseTopologyCrawler.java new file mode 100644 index 0000000..6956ef1 --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHbaseTopologyCrawler.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.topology; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.apache.eagle.topology.extractor.hbase.HbaseTopologyCrawler; +import org.apache.eagle.topology.resolver.TopologyRackResolver; +import org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver; +import org.junit.Ignore; +import org.junit.Test; + +public class TestHbaseTopologyCrawler { + + @Test @Ignore + public void test() { + Config config = ConfigFactory.load(); + + TopologyCheckAppConfig topologyCheckAppConfig = TopologyCheckAppConfig.getInstance(config); + TopologyRackResolver rackResolver = new DefaultTopologyRackResolver(); + HbaseTopologyCrawler crawler = new HbaseTopologyCrawler(topologyCheckAppConfig, rackResolver, null); + crawler.extract(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHdfsTopologyCrawler.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHdfsTopologyCrawler.java b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHdfsTopologyCrawler.java new file mode 100644 index 0000000..5069a3b --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHdfsTopologyCrawler.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.topology; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.apache.eagle.topology.extractor.hdfs.HdfsTopologyCrawler; +import org.apache.eagle.topology.resolver.TopologyRackResolver; +import org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver; +import org.junit.Ignore; +import org.junit.Test; + +public class TestHdfsTopologyCrawler { + + @Test @Ignore + public void test() { + Config config = ConfigFactory.load(); + + TopologyCheckAppConfig topologyCheckAppConfig = TopologyCheckAppConfig.getInstance(config); + TopologyRackResolver rackResolver = new DefaultTopologyRackResolver(); + HdfsTopologyCrawler crawler = new HdfsTopologyCrawler(topologyCheckAppConfig, rackResolver, null); + crawler.extract(); + } +}