Github user wardbekker commented on a diff in the pull request: https://github.com/apache/metron/pull/946#discussion_r179060961 --- Diff: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java --- @@ -107,32 +113,94 @@ public static String getBaseIndexName(String indexName) { return parts[0]; } - public static TransportClient getClient(Map<String, Object> globalConfiguration, Map<String, String> optionalSettings) { + /** + * Instantiates an Elasticsearch client based on es.client.class, if set. Defaults to + * org.elasticsearch.transport.client.PreBuiltTransportClient. + * + * @param globalConfiguration Metron global config + * @return + */ + public static TransportClient getClient(Map<String, Object> globalConfiguration) { + Set<String> customESSettings = new HashSet<>(); + customESSettings.addAll(Arrays.asList("es.client.class", "es.xpack.username", "es.xpack.password.file")); Settings.Builder settingsBuilder = Settings.builder(); - settingsBuilder.put("cluster.name", globalConfiguration.get("es.clustername")); - settingsBuilder.put("client.transport.ping_timeout","500s"); - if (optionalSettings != null) { - settingsBuilder.put(optionalSettings); + Map<String, String> esSettings = getEsSettings(globalConfiguration); + for (Map.Entry<String, String> entry : esSettings.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (!customESSettings.contains(key)) { + settingsBuilder.put(key, value); + } } - Settings settings = settingsBuilder.build(); - TransportClient client; - try{ + settingsBuilder.put("cluster.name", globalConfiguration.get("es.clustername")); + settingsBuilder.put("client.transport.ping_timeout", esSettings.getOrDefault("client.transport.ping_timeout","500s")); + setXPackSecurityOrNone(settingsBuilder, esSettings); + + try { LOG.info("Number of available processors in Netty: {}", NettyRuntimeWrapper.availableProcessors()); // Netty sets available processors statically and if an attempt is made to set it more than // once an IllegalStateException is thrown by NettyRuntime.setAvailableProcessors(NettyRuntime.java:87) // https://discuss.elastic.co/t/getting-availableprocessors-is-already-set-to-1-rejecting-1-illegalstateexception-exception/103082 // https://discuss.elastic.co/t/elasticsearch-5-4-1-availableprocessors-is-already-set/88036 System.setProperty("es.set.netty.runtime.available.processors", "false"); - client = new PreBuiltTransportClient(settings); - for(HostnamePort hp : getIps(globalConfiguration)) { + TransportClient client = createTransportClient(settingsBuilder.build(), esSettings); + for (HostnamePort hp : getIps(globalConfiguration)) { client.addTransportAddress( new InetSocketTransportAddress(InetAddress.getByName(hp.hostname), hp.port) ); } - } catch (UnknownHostException exception){ + return client; + } catch (UnknownHostException exception) { throw new RuntimeException(exception); } - return client; + } + + private static Map<String, String> getEsSettings(Map<String, Object> config) { + return ConversionUtils + .convertMap((Map<String, Object>) config.getOrDefault("es.client.settings", new HashMap<String, Object>()), + String.class); + } + + private static void setXPackSecurityOrNone(Settings.Builder settingsBuilder, Map<String, String> esSettings) { + if (esSettings.containsKey("es.xpack.password.file")) { --- End diff -- Done
---