AMBARI-21507 Log Search Solr output properties should be provided by the Config API (mgergely)
Change-Id: I32ec1afa8549b7e065fa904f2de2db0b255f690f Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/dc85e67d Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/dc85e67d Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/dc85e67d Branch: refs/heads/trunk Commit: dc85e67d7d1f1287398824541c99b7f3872796a0 Parents: e3a50d9 Author: Miklos Gergely <mgerg...@hortonworks.com> Authored: Thu Jul 27 16:53:34 2017 +0200 Committer: Miklos Gergely <mgerg...@hortonworks.com> Committed: Thu Jul 27 16:53:34 2017 +0200 ---------------------------------------------------------------------- .../logsearch/config/api/LogSearchConfig.java | 41 +++- .../config/api/OutputConfigMonitor.java | 44 +++++ .../model/outputconfig/OutputProperties.java | 23 +++ .../outputconfig/OutputSolrProperties.java | 26 +++ .../config/api/LogSearchConfigClass1.java | 19 +- .../config/api/LogSearchConfigClass2.java | 19 +- .../config/zookeeper/LogSearchConfigZK.java | 87 +++++++-- .../impl/OutputSolrPropertiesImpl.java | 46 +++++ .../org/apache/ambari/logfeeder/LogFeeder.java | 5 +- .../ambari/logfeeder/common/ConfigHandler.java | 11 +- .../logfeeder/common/LogEntryParseTester.java | 2 +- .../logfeeder/input/InputConfigUploader.java | 2 +- .../ambari/logfeeder/input/InputSimulate.java | 1 + .../apache/ambari/logfeeder/output/Output.java | 36 +++- .../ambari/logfeeder/output/OutputManager.java | 11 ++ .../ambari/logfeeder/output/OutputSolr.java | 187 +++++++++++-------- .../ambari/logfeeder/output/OutputSolrTest.java | 29 ++- .../logsearch/conf/SolrAuditLogPropsConfig.java | 5 + .../conf/SolrEventHistoryPropsConfig.java | 5 + .../ambari/logsearch/conf/SolrPropsConfig.java | 2 + .../conf/SolrServiceLogPropsConfig.java | 5 + .../configurer/LogSearchConfigConfigurer.java | 3 + .../configurer/SolrCollectionConfigurer.java | 5 +- .../ambari/logsearch/dao/AuditSolrDao.java | 1 + .../logsearch/dao/ServiceLogsSolrDao.java | 1 + .../ambari/logsearch/dao/SolrDaoBase.java | 13 +- .../handler/CreateCollectionHandler.java | 12 +- .../logsearch/manager/ShipperConfigManager.java | 10 +- .../logfeeder/shipper-conf/output.config.json | 10 +- .../server/upgrade/UpgradeCatalog300.java | 42 +++-- .../0.5.0/properties/output.config.json.j2 | 8 +- .../LOGSEARCH/0.5.0/service_advisor.py | 33 ++-- .../server/upgrade/UpgradeCatalog300Test.java | 73 ++++++-- 33 files changed, 625 insertions(+), 192 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java index 6c5cefd..76be392 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java +++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap; +import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties; import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig; /** @@ -57,14 +58,23 @@ public interface LogSearchConfig extends Closeable { List<String> getServices(String clusterName); /** - * Checks if input configuration exists. + * Checks if input configuration exists. Will be used only in LOGFEEDER mode. + * + * @param serviceName The name of the service looked for. + * @return If input configuration exists for the service. + * @throws Exception + */ + boolean inputConfigExistsLogFeeder(String serviceName) throws Exception; + + /** + * Checks if input configuration exists. Will be used only in SERVER mode. * * @param clusterName The name of the cluster where the service is looked for. * @param serviceName The name of the service looked for. * @return If input configuration exists for the service. * @throws Exception */ - boolean inputConfigExists(String clusterName, String serviceName) throws Exception; + boolean inputConfigExistsServer(String clusterName, String serviceName) throws Exception; /** * Returns the global configurations of a cluster. Will be used only in SERVER mode. @@ -140,4 +150,31 @@ public interface LogSearchConfig extends Closeable { */ void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor, String clusterName) throws Exception; + + /** + * Saves the properties of an Output Solr. Will be used only in SERVER mode. + * + * @param type The type of the Output Solr. + * @param outputSolrProperties The properties of the Output Solr. + * @throws Exception + */ + void saveOutputSolrProperties(String type, OutputSolrProperties outputSolrProperties) throws Exception; + + /** + * Get the properties of an Output Solr. Will be used only in LOGFEEDER mode. + * + * @param type The type of the Output Solr. + * @return The properties of the Output Solr, or null if it doesn't exist. + * @throws Exception + */ + OutputSolrProperties getOutputSolrProperties(String type) throws Exception; + + /** + * Saves the properties of an Output Solr. Will be used only in LOGFEEDER mode. + * + * @param type The type of the Output Solr. + * @param outputConfigMonitors The monitors which want to watch the output config changes. + * @throws Exception + */ + void monitorOutputProperties(List<? extends OutputConfigMonitor> outputConfigMonitors) throws Exception; } http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/OutputConfigMonitor.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/OutputConfigMonitor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/OutputConfigMonitor.java new file mode 100644 index 0000000..c54626d --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/OutputConfigMonitor.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logsearch.config.api; + +import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties; + +/** + * Monitors output configuration changes. + */ +public interface OutputConfigMonitor { + /** + * @return The destination of the output. + */ + String getDestination(); + + /** + * @return The type of the output logs. + */ + String getOutputType(); + + /** + * Will be called whenever there is a change in the configuration of the output. + * + * @param outputProperties The modified properties of the output. + */ + void outputConfigChanged(OutputProperties outputProperties); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/outputconfig/OutputProperties.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/outputconfig/OutputProperties.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/outputconfig/OutputProperties.java new file mode 100644 index 0000000..affd5b9 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/outputconfig/OutputProperties.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logsearch.config.api.model.outputconfig; + +public interface OutputProperties { +} http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/outputconfig/OutputSolrProperties.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/outputconfig/OutputSolrProperties.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/outputconfig/OutputSolrProperties.java new file mode 100644 index 0000000..586e785 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/outputconfig/OutputSolrProperties.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logsearch.config.api.model.outputconfig; + +public interface OutputSolrProperties extends OutputProperties { + String getCollection(); + + String getSplitIntervalMins(); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java index 28844d5..e308346 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java +++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java @@ -26,6 +26,7 @@ import org.apache.ambari.logsearch.config.api.InputConfigMonitor; import org.apache.ambari.logsearch.config.api.LogSearchConfig; import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap; +import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties; import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig; public class LogSearchConfigClass1 implements LogSearchConfig { @@ -33,7 +34,12 @@ public class LogSearchConfigClass1 implements LogSearchConfig { public void init(Component component, Map<String, String> properties, String clusterName) {} @Override - public boolean inputConfigExists(String clusterName, String serviceName) throws Exception { + public boolean inputConfigExistsLogFeeder(String serviceName) throws Exception { + return false; + } + + @Override + public boolean inputConfigExistsServer(String clusterName, String serviceName) throws Exception { return false; } @@ -74,5 +80,16 @@ public class LogSearchConfigClass1 implements LogSearchConfig { } @Override + public void saveOutputSolrProperties(String type, OutputSolrProperties outputSolrProperties) throws Exception {} + + @Override + public OutputSolrProperties getOutputSolrProperties(String type) { + return null; + } + + @Override + public void monitorOutputProperties(List<? extends OutputConfigMonitor> outputConfigMonitors) throws Exception {} + + @Override public void close() {} } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java index 5934fa6..b64dae8 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java +++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java @@ -26,6 +26,7 @@ import org.apache.ambari.logsearch.config.api.InputConfigMonitor; import org.apache.ambari.logsearch.config.api.LogSearchConfig; import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap; +import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties; import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig; public class LogSearchConfigClass2 implements LogSearchConfig { @@ -33,7 +34,12 @@ public class LogSearchConfigClass2 implements LogSearchConfig { public void init(Component component, Map<String, String> properties, String clusterName) {} @Override - public boolean inputConfigExists(String clusterName, String serviceName) throws Exception { + public boolean inputConfigExistsLogFeeder(String serviceName) throws Exception { + return false; + } + + @Override + public boolean inputConfigExistsServer(String clusterName, String serviceName) throws Exception { return false; } @@ -74,5 +80,16 @@ public class LogSearchConfigClass2 implements LogSearchConfig { } @Override + public void saveOutputSolrProperties(String type, OutputSolrProperties outputSolrProperties) throws Exception {} + + @Override + public OutputSolrProperties getOutputSolrProperties(String type) { + return null; + } + + @Override + public void monitorOutputProperties(List<? extends OutputConfigMonitor> outputConfigMonitors) throws Exception {} + + @Override public void close() {} } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java index fdd8ed6..387d0c6 100644 --- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java +++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java @@ -27,12 +27,15 @@ import java.util.TreeMap; import org.apache.ambari.logsearch.config.api.LogSearchConfig; import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; +import org.apache.ambari.logsearch.config.api.OutputConfigMonitor; import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter; import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap; +import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties; import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig; import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputAdapter; import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigGson; import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigImpl; +import org.apache.ambari.logsearch.config.zookeeper.model.outputconfig.impl.OutputSolrPropertiesImpl; import org.apache.ambari.logsearch.config.api.InputConfigMonitor; import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor; import org.apache.commons.collections.MapUtils; @@ -98,9 +101,12 @@ public class LogSearchConfigZK implements LogSearchConfig { private Map<String, String> properties; private CuratorFramework client; - private TreeCache cache; private Gson gson; + private TreeCache serverCache; + private TreeCache logFeederClusterCache; + private TreeCache outputCache; + @Override public void init(Component component, Map<String, String> properties, String clusterName) throws Exception { this.properties = properties; @@ -115,28 +121,39 @@ public class LogSearchConfigZK implements LogSearchConfig { .build(); client.start(); + outputCache = new TreeCache(client, "/output"); + outputCache.start(); if (component == Component.SERVER) { if (client.checkExists().forPath("/") == null) { client.create().creatingParentContainersIfNeeded().forPath("/"); } - cache = new TreeCache(client, "/"); - cache.start(); + if (client.checkExists().forPath("/output") == null) { + client.create().creatingParentContainersIfNeeded().forPath("/output"); + } + serverCache = new TreeCache(client, "/"); + serverCache.start(); } else { while (client.checkExists().forPath("/") == null) { LOG.info("Root node is not present yet, going to sleep for " + WAIT_FOR_ROOT_SLEEP_SECONDS + " seconds"); Thread.sleep(WAIT_FOR_ROOT_SLEEP_SECONDS * 1000); } - cache = new TreeCache(client, String.format("/%s", clusterName)); + logFeederClusterCache = new TreeCache(client, String.format("/%s", clusterName)); } gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create(); } @Override - public boolean inputConfigExists(String clusterName, String serviceName) throws Exception { + public boolean inputConfigExistsLogFeeder(String serviceName) throws Exception { + String nodePath = String.format("/input/%s", serviceName); + return logFeederClusterCache.getCurrentData(nodePath) != null; + } + + @Override + public boolean inputConfigExistsServer(String clusterName, String serviceName) throws Exception { String nodePath = String.format("/%s/input/%s", clusterName, serviceName); - return cache.getCurrentData(nodePath) != null; + return serverCache.getCurrentData(nodePath) != null; } @Override @@ -261,8 +278,8 @@ public class LogSearchConfigZK implements LogSearchConfig { } } }; - cache.getListenable().addListener(listener); - cache.start(); + logFeederClusterCache.getListenable().addListener(listener); + logFeederClusterCache.start(); } private void createGlobalConfigNode(JsonArray globalConfigNode, String clusterName) { @@ -270,7 +287,7 @@ public class LogSearchConfigZK implements LogSearchConfig { String data = InputConfigGson.gson.toJson(globalConfigNode); try { - if (cache.getCurrentData(globalConfigNodePath) != null) { + if (logFeederClusterCache.getCurrentData(globalConfigNodePath) != null) { client.setData().forPath(globalConfigNodePath, data.getBytes()); } else { client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(globalConfigNodePath, data.getBytes()); @@ -283,14 +300,14 @@ public class LogSearchConfigZK implements LogSearchConfig { @Override public List<String> getServices(String clusterName) { String parentPath = String.format("/%s/input", clusterName); - Map<String, ChildData> serviceNodes = cache.getCurrentChildren(parentPath); + Map<String, ChildData> serviceNodes = serverCache.getCurrentChildren(parentPath); return new ArrayList<String>(serviceNodes.keySet()); } @Override public String getGlobalConfigs(String clusterName) { String globalConfigNodePath = String.format("/%s/global", clusterName); - return new String(cache.getCurrentData(globalConfigNodePath).getData()); + return new String(serverCache.getCurrentData(globalConfigNodePath).getData()); } @Override @@ -299,7 +316,7 @@ public class LogSearchConfigZK implements LogSearchConfig { JsonArray globalConfigs = (JsonArray) new JsonParser().parse(globalConfigData); InputAdapter.setGlobalConfigs(globalConfigs); - ChildData childData = cache.getCurrentData(String.format("/%s/input/%s", clusterName, serviceName)); + ChildData childData = serverCache.getCurrentData(String.format("/%s/input/%s", clusterName, serviceName)); return childData == null ? null : InputConfigGson.gson.fromJson(new String(childData.getData()), InputConfigImpl.class); } @@ -320,7 +337,7 @@ public class LogSearchConfigZK implements LogSearchConfig { for (Map.Entry<String, LogLevelFilter> e : filters.getFilter().entrySet()) { String nodePath = String.format("/%s/loglevelfilter/%s", clusterName, e.getKey()); String logLevelFilterJson = gson.toJson(e.getValue()); - String currentLogLevelFilterJson = new String(cache.getCurrentData(nodePath).getData()); + String currentLogLevelFilterJson = new String(serverCache.getCurrentData(nodePath).getData()); if (!logLevelFilterJson.equals(currentLogLevelFilterJson)) { client.setData().forPath(nodePath, logLevelFilterJson.getBytes()); LOG.info("Set log level filter for the log " + e.getKey() + " for cluster " + clusterName); @@ -331,7 +348,7 @@ public class LogSearchConfigZK implements LogSearchConfig { @Override public LogLevelFilterMap getLogLevelFilters(String clusterName) { String parentPath = String.format("/%s/loglevelfilter", clusterName); - Map<String, ChildData> logLevelFilterNodes = cache.getCurrentChildren(parentPath); + Map<String, ChildData> logLevelFilterNodes = serverCache.getCurrentChildren(parentPath); TreeMap<String, LogLevelFilter> filters = new TreeMap<>(); for (Map.Entry<String, ChildData> e : logLevelFilterNodes.entrySet()) { LogLevelFilter logLevelFilter = gson.fromJson(new String(e.getValue().getData()), LogLevelFilter.class); @@ -387,6 +404,48 @@ public class LogSearchConfigZK implements LogSearchConfig { } @Override + public void saveOutputSolrProperties(String type, OutputSolrProperties outputSolrProperties) throws Exception { + String nodePath = String.format("/output/solr/%s", type); + String data = gson.toJson(outputSolrProperties); + if (outputCache.getCurrentData(nodePath) == null) { + client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath, data.getBytes()); + } else { + client.setData().forPath(nodePath, data.getBytes()); + } + } + + @Override + public OutputSolrProperties getOutputSolrProperties(String type) throws Exception { + String nodePath = String.format("/output/solr/%s", type); + ChildData currentData = outputCache.getCurrentData(nodePath); + return currentData == null ? + null : + gson.fromJson(new String(currentData.getData()), OutputSolrPropertiesImpl.class); + } + + @Override + public void monitorOutputProperties(final List<? extends OutputConfigMonitor> outputConfigMonitors) throws Exception { + TreeCacheListener listener = new TreeCacheListener() { + public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { + if (event.getType() != Type.NODE_UPDATED) { + return; + } + + LOG.info("Output config updated: " + event.getData().getPath()); + for (OutputConfigMonitor monitor : outputConfigMonitors) { + String monitorPath = String.format("/output/%s/%s", monitor.getDestination(), monitor.getOutputType()); + if (monitorPath.equals(event.getData().getPath())) { + String nodeData = new String(event.getData().getData()); + OutputSolrProperties outputSolrProperties = gson.fromJson(nodeData, OutputSolrPropertiesImpl.class); + monitor.outputConfigChanged(outputSolrProperties); + } + } + } + }; + outputCache.getListenable().addListener(listener); + } + + @Override public void close() { LOG.info("Closing ZooKeeper Connection"); client.close(); http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/outputconfig/impl/OutputSolrPropertiesImpl.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/outputconfig/impl/OutputSolrPropertiesImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/outputconfig/impl/OutputSolrPropertiesImpl.java new file mode 100644 index 0000000..4b9f54c --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/outputconfig/impl/OutputSolrPropertiesImpl.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logsearch.config.zookeeper.model.outputconfig.impl; + +import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties; + +import com.google.gson.annotations.SerializedName; + +public class OutputSolrPropertiesImpl implements OutputSolrProperties { + private final String collection; + + @SerializedName("split_interval_mins") + private final String splitIntervalMins; + + public OutputSolrPropertiesImpl(String collection, String splitIntervalMins) { + this.collection = collection; + this.splitIntervalMins = splitIntervalMins; + } + + @Override + public String getCollection() { + return collection; + } + + @Override + public String getSplitIntervalMins() { + return splitIntervalMins; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java index 59c2a22..ba3412b 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java @@ -53,7 +53,7 @@ public class LogFeeder { private final LogFeederCommandLine cli; - private ConfigHandler configHandler = new ConfigHandler(); + private ConfigHandler configHandler; private LogSearchConfig config; private MetricsManager metricsManager = new MetricsManager(); @@ -78,11 +78,12 @@ public class LogFeeder { private void init() throws Throwable { long startTime = System.currentTimeMillis(); - configHandler.init(); SSLUtil.ensureStorePasswords(); config = LogSearchConfigFactory.createLogSearchConfig(Component.LOGFEEDER, Maps.fromProperties(LogFeederUtil.getProperties()), LogFeederUtil.getClusterName(), LogSearchConfigZK.class); + configHandler = new ConfigHandler(config); + configHandler.init(); LogLevelFilterHandler.init(config); InputConfigUploader.load(config); config.monitorInputConfigChanges(configHandler, new LogLevelFilterHandler(), LogFeederUtil.getClusterName()); http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java index 5bf074c..30b61a1 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java @@ -48,6 +48,7 @@ import org.apache.commons.lang.BooleanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.ambari.logfeeder.util.AliasUtil.AliasType; import org.apache.ambari.logsearch.config.api.InputConfigMonitor; +import org.apache.ambari.logsearch.config.api.LogSearchConfig; import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor; import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor; @@ -85,6 +86,8 @@ public class ConfigHandler implements InputConfigMonitor { ) private static final String SIMULATE_INPUT_NUMBER_PROPERTY = "logfeeder.simulate.input_number"; + private final LogSearchConfig logSearchConfig; + private final OutputManager outputManager = new OutputManager(); private final InputManager inputManager = new InputManager(); @@ -97,7 +100,9 @@ public class ConfigHandler implements InputConfigMonitor { private boolean simulateMode = false; - public ConfigHandler() {} + public ConfigHandler(LogSearchConfig logSearchConfig) { + this.logSearchConfig = logSearchConfig; + } public void init() throws Exception { loadConfigFiles(); @@ -106,6 +111,8 @@ public class ConfigHandler implements InputConfigMonitor { inputManager.init(); outputManager.init(); + + logSearchConfig.monitorOutputProperties(outputManager.getOutputsToMonitor()); } private void loadConfigFiles() throws Exception { @@ -271,6 +278,7 @@ public class ConfigHandler implements InputConfigMonitor { } output.setDestination(value); output.loadConfig(map); + output.setLogSearchConfig(logSearchConfig); // We will only check for is_enabled out here. Down below we will check whether this output is enabled for the input if (output.isEnabled()) { @@ -387,6 +395,7 @@ public class ConfigHandler implements InputConfigMonitor { // In case of simulation copies of the output are added for each simulation instance, these must be added to the manager for (Output output : InputSimulate.getSimulateOutputs()) { + output.setLogSearchConfig(logSearchConfig); outputManager.add(output); usedOutputSet.add(output); } http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java index 5356159..ec29f69 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java @@ -73,7 +73,7 @@ public class LogEntryParseTester { public Map<String, Object> parse() throws Exception { InputConfig inputConfig = getInputConfig(); - ConfigHandler configHandler = new ConfigHandler(); + ConfigHandler configHandler = new ConfigHandler(null); Input input = configHandler.getTestInput(inputConfig, logId); final Map<String, Object> result = new HashMap<>(); input.getFirstFilter().init(); http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java index 09fc3f5..10642d1 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java @@ -84,7 +84,7 @@ public class InputConfigUploader extends Thread { String serviceName = m.group(1); String inputConfig = Files.toString(inputConfigFile, Charset.defaultCharset()); - if (!config.inputConfigExists(LogFeederUtil.getClusterName(), serviceName)) { + if (!config.inputConfigExistsLogFeeder(serviceName)) { config.createInputConfig(LogFeederUtil.getClusterName(), serviceName, inputConfig); } filesHandled.add(inputConfigFile.getAbsolutePath()); http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java index f1002ae..7c487ba 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java @@ -163,6 +163,7 @@ public class InputSimulate extends Input { Class<? extends Output> clazz = output.getClass(); Output outputCopy = clazz.newInstance(); outputCopy.loadConfig(output.getConfigs()); + outputCopy.setDestination(output.getDestination()); simulateOutputs.add(outputCopy); super.addOutput(outputCopy); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java index 65b9e19..b370e58 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java @@ -28,8 +28,11 @@ import org.apache.ambari.logfeeder.common.ConfigBlock; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logsearch.config.api.LogSearchConfig; +import org.apache.ambari.logsearch.config.api.OutputConfigMonitor; +import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties; -public abstract class Output extends ConfigBlock { +public abstract class Output extends ConfigBlock implements OutputConfigMonitor { private String destination = null; protected MetricData writeBytesMetric = new MetricData(getWriteBytesMetricName(), false); @@ -37,6 +40,20 @@ public abstract class Output extends ConfigBlock { return null; } + public boolean monitorConfigChanges() { + return false; + }; + + @Override + public String getOutputType() { + throw new IllegalStateException("This method should be overriden if the Output wants to monitor the configuration"); + } + + @Override + public void outputConfigChanged(OutputProperties outputProperties) { + throw new IllegalStateException("This method should be overriden if the Output wants to monitor the configuration"); + }; + @Override public String getShortDescription() { return null; @@ -50,14 +67,11 @@ public abstract class Output extends ConfigBlock { return super.getNameForThread(); } - public abstract void write(String block, InputMarker inputMarker) - throws Exception; + public abstract void write(String block, InputMarker inputMarker) throws Exception; - public abstract void copyFile(File inputFile, InputMarker inputMarker) - throws UnsupportedOperationException; + public abstract void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException; - public void write(Map<String, Object> jsonObj, InputMarker inputMarker) - throws Exception { + public void write(Map<String, Object> jsonObj, InputMarker inputMarker) throws Exception { write(LogFeederUtil.getGson().toJson(jsonObj), inputMarker); } @@ -90,6 +104,12 @@ public abstract class Output extends ConfigBlock { this.destination = destination; } + protected LogSearchConfig logSearchConfig; + + public void setLogSearchConfig(LogSearchConfig logSearchConfig) { + this.logSearchConfig = logSearchConfig; + } + @Override public void addMetricsContainers(List<MetricData> metricsList) { super.addMetricsContainers(metricsList); @@ -99,7 +119,6 @@ public abstract class Output extends ConfigBlock { @Override public synchronized void logStat() { super.logStat(); - logStatForMetric(writeBytesMetric, "Stat: Bytes Written"); } @@ -115,5 +134,4 @@ public abstract class Output extends ConfigBlock { } } } - } http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java index 4d6c43b..48716fa 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java @@ -33,6 +33,7 @@ import org.apache.ambari.logfeeder.loglevelfilter.FilterLogData; import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logfeeder.util.MurmurHash; +import org.apache.ambari.logsearch.config.api.OutputConfigMonitor; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -56,6 +57,16 @@ public class OutputManager { return outputs; } + public List<? extends OutputConfigMonitor> getOutputsToMonitor() { + List<Output> outputsToMonitor = new ArrayList<>(); + for (Output output : outputs) { + if (output.monitorConfigChanges()) { + outputsToMonitor.add(output); + } + } + return outputsToMonitor; + } + public void add(Output output) { this.outputs.add(output); } http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java index 162a7f8..596e022 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java @@ -25,9 +25,11 @@ import java.net.MalformedURLException; import java.util.ArrayList; import java.util.Calendar; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -37,6 +39,8 @@ import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.ambari.logfeeder.util.DateUtil; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription; +import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties; +import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -44,18 +48,23 @@ import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.HttpClientUtil; -import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer; -import org.apache.solr.client.solrj.impl.LBHttpSolrClient; import org.apache.solr.client.solrj.response.SolrPingResponse; import org.apache.solr.client.solrj.response.UpdateResponse; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.CollectionStateWatcher; +import org.apache.solr.common.cloud.DocCollection; import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE; -public class OutputSolr extends Output { +public class OutputSolr extends Output implements CollectionStateWatcher { + private static final Logger LOG = Logger.getLogger(OutputSolr.class); + + private static final int OUTPUT_PROPERTIES_WAIT_MS = 10000; + private static final int SHARDS_WAIT_MS = 10000; + private static final String DEFAULT_SOLR_JAAS_FILE = "/etc/security/keytabs/logsearch_solr.service.keytab"; @LogSearchPropertyDescription( name = "logfeeder.solr.jaas.file", @@ -66,8 +75,6 @@ public class OutputSolr extends Output { ) private static final String SOLR_JAAS_FILE_PROPERTY = "logfeeder.solr.jaas.file"; - private static final Logger LOG = Logger.getLogger(OutputSolr.class); - private static final boolean DEFAULT_SOLR_KERBEROS_ENABLE = false; @LogSearchPropertyDescription( name = "logfeeder.solr.kerberos.enable", @@ -80,17 +87,17 @@ public class OutputSolr extends Output { private static final int DEFAULT_MAX_BUFFER_SIZE = 5000; private static final int DEFAULT_MAX_INTERVAL_MS = 3000; - private static final int DEFAULT_NUMBER_OF_SHARDS = 1; - private static final int DEFAULT_SPLIT_INTERVAL = 30; private static final int DEFAULT_NUMBER_OF_WORKERS = 1; private static final boolean DEFAULT_SKIP_LOGTIME = false; private static final int RETRY_INTERVAL = 30; + private String type; private String collection; private String splitMode; private int splitInterval; - private int numberOfShards; + private List<String> shards; + private String zkConnectString; private int maxIntervalMS; private int workers; private int maxBufferSize; @@ -98,10 +105,22 @@ public class OutputSolr extends Output { private int lastSlotByMin = -1; private boolean skipLogtime = false; + private final Object propertiesLock = new Object(); + private BlockingQueue<OutputData> outgoingBuffer = null; private List<SolrWorkerThread> workerThreadList = new ArrayList<>(); @Override + public boolean monitorConfigChanges() { + return true; + }; + + @Override + public String getOutputType() { + return type; + } + + @Override protected String getStatMetricName() { return "output.solr.write_logs"; } @@ -110,24 +129,34 @@ public class OutputSolr extends Output { protected String getWriteBytesMetricName() { return "output.solr.write_bytes"; } - + @Override public void init() throws Exception { super.init(); initParams(); setupSecurity(); createOutgoingBuffer(); + createSolrStateWatcher(); createSolrWorkers(); } private void initParams() throws Exception { - splitMode = getStringValue("splits_interval_mins", "none"); - if (!splitMode.equalsIgnoreCase("none")) { - splitInterval = getIntValue("split_interval_mins", DEFAULT_SPLIT_INTERVAL); + type = getStringValue("type"); + while (true) { + OutputSolrProperties outputSolrProperties = logSearchConfig.getOutputSolrProperties(type); + if (outputSolrProperties == null) { + LOG.info("Output solr properties for type " + type + " is not available yet."); + try { Thread.sleep(OUTPUT_PROPERTIES_WAIT_MS); } catch (Exception e) { LOG.warn(e); } + } else { + initPropertiesFromLogSearchConfig(outputSolrProperties, true); + break; + } + } + + zkConnectString = getStringValue("zk_connect_string"); + if (StringUtils.isEmpty(zkConnectString)) { + throw new Exception("For solr output the zk_connect_string property need to be set"); } - isComputeCurrentCollection = !splitMode.equalsIgnoreCase("none"); - - numberOfShards = getIntValue("number_of_shards", DEFAULT_NUMBER_OF_SHARDS); skipLogtime = getBooleanValue("skip_logtime", DEFAULT_SKIP_LOGTIME); @@ -140,22 +169,39 @@ public class OutputSolr extends Output { maxBufferSize = 1; } - collection = getStringValue("collection"); - if (StringUtils.isEmpty(collection)) { - throw new Exception("Collection property is mandatory"); - } + LOG.info(String.format("Config: Number of workers=%d, splitMode=%s, splitInterval=%d." + + getShortDescription(), workers, splitMode, splitInterval)); + } - LOG.info(String.format("Config: Number of workers=%d, splitMode=%s, splitInterval=%d, numberOfShards=%d. " - + getShortDescription(), workers, splitMode, splitInterval, numberOfShards)); + @Override + public void outputConfigChanged(OutputProperties outputProperties) { + initPropertiesFromLogSearchConfig((OutputSolrProperties)outputProperties, false); } + private void initPropertiesFromLogSearchConfig(OutputSolrProperties outputSolrProperties, boolean init) { + synchronized (propertiesLock) { + splitMode = outputSolrProperties.getSplitIntervalMins(); + if (!splitMode.equalsIgnoreCase("none")) { + splitInterval = Integer.parseInt(splitMode); + } + isComputeCurrentCollection = !splitMode.equalsIgnoreCase("none"); + + // collection can not be overwritten after initialization + if (init) { + collection = outputSolrProperties.getCollection(); + if (StringUtils.isEmpty(collection)) { + throw new IllegalStateException("Collection property is mandatory"); + } + } + } + } private void setupSecurity() { String jaasFile = LogFeederUtil.getStringProperty(SOLR_JAAS_FILE_PROPERTY, DEFAULT_SOLR_JAAS_FILE); boolean securityEnabled = LogFeederUtil.getBooleanProperty(SOLR_KERBEROS_ENABLE_PROPERTY, DEFAULT_SOLR_KERBEROS_ENABLE); if (securityEnabled) { System.setProperty("java.security.auth.login.config", jaasFile); - HttpClientUtil.setConfigurer(new Krb5HttpClientConfigurer()); + HttpClientUtil.addConfigurer(new Krb5HttpClientConfigurer()); LOG.info("setupSecurity() called for kerberos configuration, jaas file: " + jaasFile); } } @@ -166,81 +212,70 @@ public class OutputSolr extends Output { outgoingBuffer = new LinkedBlockingQueue<OutputData>(bufferSize); } - private void createSolrWorkers() throws Exception, MalformedURLException { - String solrUrl = getStringValue("url"); - String zkConnectString = getStringValue("zk_connect_string"); - if (StringUtils.isEmpty(solrUrl) && StringUtils.isEmpty(zkConnectString)) { - throw new Exception("For solr output, either url or zk_connect_string property need to be set"); + private void createSolrStateWatcher() throws Exception { + if ("none".equals(splitMode)) { + return; + } + + CloudSolrClient stateWatcherClient = createSolrClient(); + stateWatcherClient.registerCollectionStateWatcher(collection, this); + while (true) { + if (shards == null) { + LOG.info("Shards are not available yet, waiting ..."); + try { Thread.sleep(SHARDS_WAIT_MS); } catch (Exception e) { LOG.warn(e); } + } else { + break; + } + } + } + + @Override + public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) { + synchronized (propertiesLock) { + shards = new ArrayList<>(collectionState.getSlicesMap().keySet()); + Collections.sort(shards); } + return false; + } + private void createSolrWorkers() throws Exception, MalformedURLException { for (int count = 0; count < workers; count++) { - SolrClient solrClient = getSolrClient(solrUrl, zkConnectString, count); + CloudSolrClient solrClient = getSolrClient(count); createSolrWorkerThread(count, solrClient); } } - SolrClient getSolrClient(String solrUrl, String zkConnectString, int count) throws Exception, MalformedURLException { - SolrClient solrClient = createSolrClient(solrUrl, zkConnectString); - pingSolr(solrUrl, zkConnectString, count, solrClient); - - return solrClient; - } + CloudSolrClient getSolrClient(int count) throws Exception, MalformedURLException { + CloudSolrClient solrClient = createSolrClient(); + pingSolr(count, solrClient); - private SolrClient createSolrClient(String solrUrl, String zkConnectString) throws Exception, MalformedURLException { - SolrClient solrClient; - if (zkConnectString != null) { - solrClient = createCloudSolrClient(zkConnectString); - } else { - solrClient = createHttpSolarClient(solrUrl); - } return solrClient; } - private SolrClient createCloudSolrClient(String zkConnectString) throws Exception { + private CloudSolrClient createSolrClient() throws Exception { LOG.info("Using zookeepr. zkConnectString=" + zkConnectString); - collection = getStringValue("collection"); - if (StringUtils.isEmpty(collection)) { - throw new Exception("For solr cloud property collection is mandatory"); - } LOG.info("Using collection=" + collection); - CloudSolrClient solrClient = new CloudSolrClient(zkConnectString); + CloudSolrClient solrClient = new CloudSolrClient.Builder().withZkHost(zkConnectString).build(); solrClient.setDefaultCollection(collection); return solrClient; } - private SolrClient createHttpSolarClient(String solrUrl) throws MalformedURLException { - String[] solrUrls = StringUtils.split(solrUrl, ","); - if (solrUrls.length == 1) { - LOG.info("Using SolrURL=" + solrUrl); - return new HttpSolrClient(solrUrl + "/" + collection); - } else { - LOG.info("Using load balance solr client. solrUrls=" + solrUrl); - LOG.info("Initial URL for LB solr=" + solrUrls[0] + "/" + collection); - LBHttpSolrClient lbSolrClient = new LBHttpSolrClient(solrUrls[0] + "/" + collection); - for (int i = 1; i < solrUrls.length; i++) { - LOG.info("Adding URL for LB solr=" + solrUrls[i] + "/" + collection); - lbSolrClient.addSolrServer(solrUrls[i] + "/" + collection); - } - return lbSolrClient; - } - } - - private void pingSolr(String solrUrl, String zkConnectString, int count, SolrClient solrClient) { + private void pingSolr(int count, CloudSolrClient solrClient) { try { - LOG.info("Pinging Solr server. zkConnectString=" + zkConnectString + ", urls=" + solrUrl); + LOG.info("Pinging Solr server. zkConnectString=" + zkConnectString); SolrPingResponse response = solrClient.ping(); if (response.getStatus() == 0) { LOG.info("Ping to Solr server is successful for worker=" + count); } else { LOG.warn( - String.format("Ping to Solr server failed. It would check again. worker=%d, solrUrl=%s, zkConnectString=%s, " + - "collection=%s, response=%s", count, solrUrl, zkConnectString, collection, response)); + String.format("Ping to Solr server failed. It would check again. worker=%d, zkConnectString=%s, collection=%s, " + + "response=%s", count, zkConnectString, collection, response)); } } catch (Throwable t) { LOG.warn(String.format( - "Ping to Solr server failed. It would check again. worker=%d, " + "solrUrl=%s, zkConnectString=%s, collection=%s", - count, solrUrl, zkConnectString, collection), t); + "Ping to Solr server failed. It would check again. worker=%d, zkConnectString=%s, collection=%s", count, + zkConnectString, collection), t); } } @@ -402,9 +437,11 @@ public class OutputSolr extends Output { boolean result = false; while (!isDrain()) { try { - if (isComputeCurrentCollection) { - // Compute the current router value - addRouterField(); + synchronized (propertiesLock) { + if (isComputeCurrentCollection) { + // Compute the current router value + addRouterField(); + } } addToSolr(outputData); resetLocalBuffer(); @@ -468,9 +505,9 @@ public class OutputSolr extends Output { int currMin = cal.get(Calendar.MINUTE); int minOfWeek = (weekDay - 1) * 24 * 60 + currHour * 60 + currMin; - int slotByMin = minOfWeek / splitInterval % numberOfShards; + int slotByMin = minOfWeek / splitInterval % shards.size(); - String shard = "shard" + slotByMin; + String shard = shards.get(slotByMin); if (lastSlotByMin != slotByMin) { LOG.info("Switching to shard " + shard + ", output=" + getShortDescription()); http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java index 8985110..ce040f9 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java @@ -28,8 +28,10 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.ambari.logfeeder.input.Input; import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.ambari.logsearch.config.api.LogSearchConfig; +import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties; +import org.apache.ambari.logsearch.config.zookeeper.model.outputconfig.impl.OutputSolrPropertiesImpl; import org.apache.log4j.Logger; -import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.response.UpdateResponse; import org.apache.solr.common.SolrInputDocument; @@ -48,6 +50,7 @@ public class OutputSolrTest { private static final Logger LOG = Logger.getLogger(OutputSolrTest.class); private OutputSolr outputSolr; + private LogSearchConfig logSearchConfigMock; private Map<Integer, SolrInputDocument> receivedDocs = new ConcurrentHashMap<>(); @Rule @@ -56,8 +59,9 @@ public class OutputSolrTest { @Before public void init() throws Exception { outputSolr = new OutputSolr() { + @SuppressWarnings("deprecation") @Override - SolrClient getSolrClient(String solrUrl, String zkConnectString, int count) throws Exception, MalformedURLException { + CloudSolrClient getSolrClient(int count) throws Exception, MalformedURLException { return new CloudSolrClient(null) { private static final long serialVersionUID = 1L; @@ -74,6 +78,13 @@ public class OutputSolrTest { }; } }; + + OutputSolrProperties outputSolrProperties = new OutputSolrPropertiesImpl("hadoop_logs", "none"); + logSearchConfigMock = EasyMock.createNiceMock(LogSearchConfig.class); + EasyMock.expect(logSearchConfigMock.getOutputSolrProperties("service")).andReturn(outputSolrProperties); + EasyMock.replay(logSearchConfigMock); + + outputSolr.setLogSearchConfig(logSearchConfigMock); } @Test @@ -81,9 +92,9 @@ public class OutputSolrTest { LOG.info("testOutputToSolr_uploadData()"); Map<String, Object> config = new HashMap<String, Object>(); - config.put("url", "some url"); + config.put("zk_connect_string", "some zk_connect_string"); config.put("workers", "3"); - config.put("collection", "some collection"); + config.put("type", "service"); outputSolr.loadConfig(config); outputSolr.init(); @@ -138,22 +149,21 @@ public class OutputSolrTest { assertNotNull("No received document field found for id: " + id + ", fieldName: " + fieldName, receivedValue); assertNotNull("No expected document field found for id: " + id + ", fieldName: " + fieldName, expectedValue); - assertEquals("Field value not matching for id: " + id + ", fieldName: " + fieldName, receivedValue, - expectedValue); + assertEquals("Field value not matching for id: " + id + ", fieldName: " + fieldName, receivedValue, expectedValue); } } } @Test - public void testOutputToSolr_noUrlOrZkConnectString() throws Exception { + public void testOutputToSolr_noZkConnectString() throws Exception { LOG.info("testOutputToSolr_noUrlOrZkConnectString()"); expectedException.expect(Exception.class); - expectedException.expectMessage("For solr output, either url or zk_connect_string property need to be set"); + expectedException.expectMessage("For solr output the zk_connect_string property need to be set"); Map<String, Object> config = new HashMap<String, Object>(); config.put("workers", "3"); - config.put("collection", "some collection"); + config.put("type", "service"); outputSolr.loadConfig(config); outputSolr.init(); @@ -162,5 +172,6 @@ public class OutputSolrTest { @After public void cleanUp() { receivedDocs.clear(); + EasyMock.verify(logSearchConfigMock); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrAuditLogPropsConfig.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrAuditLogPropsConfig.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrAuditLogPropsConfig.java index c569a27..4a44e60 100644 --- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrAuditLogPropsConfig.java +++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrAuditLogPropsConfig.java @@ -241,4 +241,9 @@ public class SolrAuditLogPropsConfig implements SolrPropsConfig { public void setAliasNameIn(String aliasNameIn) { this.aliasNameIn = aliasNameIn; } + + @Override + public String getLogType() { + return "audit"; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrEventHistoryPropsConfig.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrEventHistoryPropsConfig.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrEventHistoryPropsConfig.java index 975e6a7..822cea4 100644 --- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrEventHistoryPropsConfig.java +++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrEventHistoryPropsConfig.java @@ -145,4 +145,9 @@ public class SolrEventHistoryPropsConfig extends SolrConnectionPropsConfig { void setPopulateIntervalMins(Integer populateIntervalMins) { this.populateIntervalMins = populateIntervalMins; } + + @Override + public String getLogType() { + return null; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrPropsConfig.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrPropsConfig.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrPropsConfig.java index ceddf7e..cd0a1c2 100644 --- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrPropsConfig.java +++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrPropsConfig.java @@ -58,4 +58,6 @@ public interface SolrPropsConfig { String getConfigSetFolder(); void setConfigSetFolder(String configSetFolder); + + String getLogType(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrServiceLogPropsConfig.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrServiceLogPropsConfig.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrServiceLogPropsConfig.java index e5039d5..6a0e6b1 100644 --- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrServiceLogPropsConfig.java +++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrServiceLogPropsConfig.java @@ -126,4 +126,9 @@ public class SolrServiceLogPropsConfig extends SolrConnectionPropsConfig { public void setReplicationFactor(Integer replicationFactor) { this.replicationFactor = replicationFactor; } + + @Override + public String getLogType() { + return "service"; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/LogSearchConfigConfigurer.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/LogSearchConfigConfigurer.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/LogSearchConfigConfigurer.java index c34dce6..3f6df75 100644 --- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/LogSearchConfigConfigurer.java +++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/LogSearchConfigConfigurer.java @@ -19,6 +19,7 @@ package org.apache.ambari.logsearch.configurer; +import javax.annotation.PostConstruct; import javax.inject.Inject; import javax.inject.Named; @@ -45,6 +46,8 @@ public class LogSearchConfigConfigurer implements Configurer { @Inject private LogSearchConfigState logSearchConfigState; + @PostConstruct + @Override public void start() { Thread setupThread = new Thread("setup_logsearch_config") { @Override http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/SolrCollectionConfigurer.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/SolrCollectionConfigurer.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/SolrCollectionConfigurer.java index f2d022e..225f5a3 100644 --- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/SolrCollectionConfigurer.java +++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/SolrCollectionConfigurer.java @@ -56,7 +56,7 @@ public class SolrCollectionConfigurer implements Configurer { private final SolrDaoBase solrDaoBase; private final boolean hasEnumConfig; // enumConfig.xml for solr collection - public SolrCollectionConfigurer(final SolrDaoBase solrDaoBase, final boolean hasEnumConfig) { + public SolrCollectionConfigurer(SolrDaoBase solrDaoBase, boolean hasEnumConfig) { this.solrDaoBase = solrDaoBase; this.hasEnumConfig = hasEnumConfig; } @@ -215,7 +215,8 @@ public class SolrCollectionConfigurer implements Configurer { return status; } - private void createCollectionsIfNeeded(CloudSolrClient solrClient, SolrCollectionState state, SolrPropsConfig solrPropsConfig, boolean reloadCollectionNeeded) { + private void createCollectionsIfNeeded(CloudSolrClient solrClient, SolrCollectionState state, SolrPropsConfig solrPropsConfig, + boolean reloadCollectionNeeded) { try { List<String> allCollectionList = new ListCollectionHandler().handle(solrClient, null); boolean collectionCreated = new CreateCollectionHandler(allCollectionList).handle(solrClient, solrPropsConfig); http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/AuditSolrDao.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/AuditSolrDao.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/AuditSolrDao.java index 3eea08f..4142176 100644 --- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/AuditSolrDao.java +++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/AuditSolrDao.java @@ -69,6 +69,7 @@ public class AuditSolrDao extends SolrDaoBase { String rangerAuditCollection = solrAuditLogPropsConfig.getRangerCollection(); try { + waitForLogSearchConfig(); new SolrCollectionConfigurer(this, true).start(); boolean createAlias = (aliasNameIn != null && StringUtils.isNotBlank(rangerAuditCollection)); if (createAlias) { http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/ServiceLogsSolrDao.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/ServiceLogsSolrDao.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/ServiceLogsSolrDao.java index 308ef1f..0752ac0 100644 --- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/ServiceLogsSolrDao.java +++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/ServiceLogsSolrDao.java @@ -65,6 +65,7 @@ public class ServiceLogsSolrDao extends SolrDaoBase { public void postConstructor() { LOG.info("postConstructor() called."); try { + waitForLogSearchConfig(); new SolrCollectionConfigurer(this, true).start(); } catch (Exception e) { LOG.error("error while connecting to Solr for service logs : solrUrl=" + solrServiceLogPropsConfig.getSolrUrl() http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/SolrDaoBase.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/SolrDaoBase.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/SolrDaoBase.java index b30b6ef..15f59e4 100644 --- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/SolrDaoBase.java +++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/SolrDaoBase.java @@ -24,6 +24,7 @@ import org.apache.ambari.logsearch.common.LogType; import org.apache.ambari.logsearch.common.MessageEnums; import org.apache.ambari.logsearch.conf.SolrKerberosConfig; import org.apache.ambari.logsearch.conf.SolrPropsConfig; +import org.apache.ambari.logsearch.conf.global.LogSearchConfigState; import org.apache.ambari.logsearch.conf.global.SolrCollectionState; import org.apache.ambari.logsearch.util.RESTErrorUtil; import org.apache.ambari.logsearch.util.SolrUtil; @@ -53,11 +54,21 @@ public abstract class SolrDaoBase { @Inject private SolrKerberosConfig solrKerberosConfig; - + + @Inject + protected LogSearchConfigState logSearchConfigState; + protected SolrDaoBase(LogType logType) { this.logType = logType; } + protected void waitForLogSearchConfig() { + while (!logSearchConfigState.isLogSearchConfigAvailable()) { + LOG.info("Log Search config not available yet, waiting..."); + try { Thread.sleep(1000); } catch (Exception e) { LOG.warn("Exception during waiting for Log Search Config", e); } + } + } + public QueryResponse process(SolrQuery solrQuery, String event) { SolrUtil.removeDoubleOrTripleEscapeFromFilters(solrQuery); LOG.info("Solr query will be processed: " + solrQuery); http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/handler/CreateCollectionHandler.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/handler/CreateCollectionHandler.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/handler/CreateCollectionHandler.java index 752a1e1..b6e9def 100644 --- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/handler/CreateCollectionHandler.java +++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/handler/CreateCollectionHandler.java @@ -19,6 +19,9 @@ package org.apache.ambari.logsearch.handler; import org.apache.ambari.logsearch.conf.SolrPropsConfig; +import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties; +import org.apache.ambari.logsearch.config.zookeeper.model.outputconfig.impl.OutputSolrPropertiesImpl; +import org.apache.ambari.logsearch.configurer.LogSearchConfigConfigurer; import org.apache.commons.lang.StringUtils; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpGet; @@ -50,7 +53,7 @@ public class CreateCollectionHandler implements SolrZkRequestHandler<Boolean> { private static final String MODIFY_COLLECTION_QUERY = "/admin/collections?action=MODIFYCOLLECTION&collection=%s&%s=%d"; private static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode"; - private List<String> allCollectionList; + private final List<String> allCollectionList; public CreateCollectionHandler(List<String> allCollectionList) { this.allCollectionList = allCollectionList; @@ -58,12 +61,19 @@ public class CreateCollectionHandler implements SolrZkRequestHandler<Boolean> { @Override public Boolean handle(CloudSolrClient solrClient, SolrPropsConfig solrPropsConfig) throws Exception { + if (solrPropsConfig.getLogType() != null) { + OutputSolrProperties outputSolrProperties = new OutputSolrPropertiesImpl(solrPropsConfig.getCollection(), + solrPropsConfig.getSplitInterval()); + LogSearchConfigConfigurer.getConfig().saveOutputSolrProperties(solrPropsConfig.getLogType(), outputSolrProperties); + } + boolean result; if (solrPropsConfig.getSplitInterval().equalsIgnoreCase("none")) { result = createCollection(solrClient, solrPropsConfig, this.allCollectionList); } else { result = setupCollectionsWithImplicitRouting(solrClient, solrPropsConfig, this.allCollectionList); } + return result; } http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java index 2c143c0..a1181b4 100644 --- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java +++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java @@ -33,7 +33,6 @@ import org.apache.log4j.Logger; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; -import javax.annotation.PostConstruct; import javax.inject.Inject; import javax.inject.Named; import javax.validation.ConstraintViolation; @@ -49,11 +48,6 @@ public class ShipperConfigManager extends JsonManagerBase { @Inject private LogSearchConfigConfigurer logSearchConfigConfigurer; - - @PostConstruct - private void postConstructor() { - logSearchConfigConfigurer.start(); - } public List<String> getServices(String clusterName) { return LogSearchConfigConfigurer.getConfig().getServices(clusterName); @@ -66,7 +60,7 @@ public class ShipperConfigManager extends JsonManagerBase { public Response createInputConfig(String clusterName, String serviceName, LSServerInputConfig inputConfig) { try { - if (LogSearchConfigConfigurer.getConfig().inputConfigExists(clusterName, serviceName)) { + if (LogSearchConfigConfigurer.getConfig().inputConfigExistsServer(clusterName, serviceName)) { return Response.serverError() .type(MediaType.APPLICATION_JSON) .entity(ImmutableMap.of("errorMessage", "Input config already exists for service " + serviceName)) @@ -83,7 +77,7 @@ public class ShipperConfigManager extends JsonManagerBase { public Response setInputConfig(String clusterName, String serviceName, LSServerInputConfig inputConfig) { try { - if (!LogSearchConfigConfigurer.getConfig().inputConfigExists(clusterName, serviceName)) { + if (!LogSearchConfigConfigurer.getConfig().inputConfigExistsServer(clusterName, serviceName)) { return Response.serverError() .type(MediaType.APPLICATION_JSON) .entity(ImmutableMap.of("errorMessage", "Input config doesn't exist for service " + serviceName)) http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/output.config.json ---------------------------------------------------------------------- diff --git a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/output.config.json b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/output.config.json index 55fd36c..f41e981 100644 --- a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/output.config.json +++ b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/output.config.json @@ -5,9 +5,7 @@ "comment": "Output to solr for service logs", "destination": "solr", "zk_connect_string": "localhost:9983", - "collection": "hadoop_logs", - "number_of_shards": "3", - "splits_interval_mins": "2", + "type": "service", "skip_logtime": "true", "conditions": { "fields": { @@ -22,9 +20,7 @@ "is_enabled": "true", "destination": "solr", "zk_connect_string": "localhost:9983", - "collection": "audit_logs", - "number_of_shards": "3", - "splits_interval_mins": "2", + "type": "audit", "skip_logtime": "true", "conditions": { "fields": { @@ -35,4 +31,4 @@ } } ] -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java index b4502d6..6caa770 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java @@ -352,41 +352,59 @@ public class UpgradeCatalog300 extends AbstractUpgradeCatalog { updateConfigurationPropertiesForCluster(cluster, "logfeeder-properties", newProperties, true, true); } - Config logfeederLog4jProperties = cluster.getDesiredConfigByType("logfeeder-log4j"); - if (logfeederLog4jProperties != null) { - String content = logfeederLog4jProperties.getProperties().get("content"); + Config logFeederLog4jProperties = cluster.getDesiredConfigByType("logfeeder-log4j"); + if (logFeederLog4jProperties != null) { + String content = logFeederLog4jProperties.getProperties().get("content"); if (content.contains("<!DOCTYPE log4j:configuration SYSTEM \"log4j.dtd\">")) { content = content.replace("<!DOCTYPE log4j:configuration SYSTEM \"log4j.dtd\">", "<!DOCTYPE log4j:configuration SYSTEM \"http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd\">"); updateConfigurationPropertiesForCluster(cluster, "logfeeder-log4j", Collections.singletonMap("content", content), true, true); } } - Config logsearchLog4jProperties = cluster.getDesiredConfigByType("logsearch-log4j"); - if (logsearchLog4jProperties != null) { - String content = logsearchLog4jProperties.getProperties().get("content"); + Config logSearchLog4jProperties = cluster.getDesiredConfigByType("logsearch-log4j"); + if (logSearchLog4jProperties != null) { + String content = logSearchLog4jProperties.getProperties().get("content"); if (content.contains("<!DOCTYPE log4j:configuration SYSTEM \"log4j.dtd\">")) { content = content.replace("<!DOCTYPE log4j:configuration SYSTEM \"log4j.dtd\">", "<!DOCTYPE log4j:configuration SYSTEM \"http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd\">"); updateConfigurationPropertiesForCluster(cluster, "logsearch-log4j", Collections.singletonMap("content", content), true, true); } } - Config logsearchServiceLogsConfig = cluster.getDesiredConfigByType("logsearch-service_logs-solrconfig"); - if (logsearchServiceLogsConfig != null) { - String content = logsearchServiceLogsConfig.getProperties().get("content"); + Config logSearchServiceLogsConfig = cluster.getDesiredConfigByType("logsearch-service_logs-solrconfig"); + if (logSearchServiceLogsConfig != null) { + String content = logSearchServiceLogsConfig.getProperties().get("content"); if (content.contains("class=\"solr.admin.AdminHandlers\"")) { content = content.replaceAll("(?s)<requestHandler name=\"/admin/\".*?class=\"solr.admin.AdminHandlers\" />", ""); updateConfigurationPropertiesForCluster(cluster, "logsearch-service_logs-solrconfig", Collections.singletonMap("content", content), true, true); } } - Config logsearchAuditLogsConfig = cluster.getDesiredConfigByType("logsearch-audit_logs-solrconfig"); - if (logsearchAuditLogsConfig != null) { - String content = logsearchAuditLogsConfig.getProperties().get("content"); + Config logSearchAuditLogsConfig = cluster.getDesiredConfigByType("logsearch-audit_logs-solrconfig"); + if (logSearchAuditLogsConfig != null) { + String content = logSearchAuditLogsConfig.getProperties().get("content"); if (content.contains("class=\"solr.admin.AdminHandlers\"")) { content = content.replaceAll("(?s)<requestHandler name=\"/admin/\".*?class=\"solr.admin.AdminHandlers\" />", ""); updateConfigurationPropertiesForCluster(cluster, "logsearch-audit_logs-solrconfig", Collections.singletonMap("content", content), true, true); } } + + Config logFeederOutputConfig = cluster.getDesiredConfigByType("logfeeder-output-config"); + if (logFeederOutputConfig != null) { + String content = logFeederOutputConfig.getProperties().get("content"); + content = content.replace( + " \"collection\":\"{{logsearch_solr_collection_service_logs}}\",\n" + + " \"number_of_shards\": \"{{logsearch_collection_service_logs_numshards}}\",\n" + + " \"splits_interval_mins\": \"{{logsearch_service_logs_split_interval_mins}}\",\n", + " \"type\": \"service\",\n"); + + content = content.replace( + " \"collection\":\"{{logsearch_solr_collection_audit_logs}}\",\n" + + " \"number_of_shards\": \"{{logsearch_collection_audit_logs_numshards}}\",\n" + + " \"splits_interval_mins\": \"{{logsearch_audit_logs_split_interval_mins}}\",\n", + " \"type\": \"audit\",\n"); + + updateConfigurationPropertiesForCluster(cluster, "logsearch-output-config", Collections.singletonMap("content", content), true, true); + } } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/properties/output.config.json.j2 ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/properties/output.config.json.j2 b/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/properties/output.config.json.j2 index 214e5ba..0c599c9 100644 --- a/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/properties/output.config.json.j2 +++ b/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/properties/output.config.json.j2 @@ -22,9 +22,7 @@ "is_enabled":"{{solr_service_logs_enable}}", "destination":"solr", "zk_connect_string":"{{logsearch_solr_zk_quorum}}{{logsearch_solr_zk_znode}}", - "collection":"{{logsearch_solr_collection_service_logs}}", - "number_of_shards": "{{logsearch_collection_service_logs_numshards}}", - "splits_interval_mins": "{{logsearch_service_logs_split_interval_mins}}", + "type": "service", "conditions":{ "fields":{ "rowtype":[ @@ -41,9 +39,7 @@ "is_enabled":"{{solr_audit_logs_enable}}", "destination":"solr", "zk_connect_string":"{{logsearch_solr_zk_quorum}}{{logsearch_solr_zk_znode}}", - "collection":"{{logsearch_solr_collection_audit_logs}}", - "number_of_shards": "{{logsearch_collection_audit_logs_numshards}}", - "splits_interval_mins": "{{logsearch_audit_logs_split_interval_mins}}", + "type": "audit", "conditions":{ "fields":{ "rowtype":[