This is an automated email from the ASF dual-hosted git repository. pvary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new b1c397f HIVE-21911: Pluggable LlapMetricsListener on Tez side to disable / resize Daemons (Peter Vary reviewed by Oliver Draese and Adam Szita) b1c397f is described below commit b1c397f769de348d38f4b8eab217cab8eaefa5e1 Author: Peter Vary <pv...@cloudera.com> AuthorDate: Wed Jul 3 09:30:30 2019 +0200 HIVE-21911: Pluggable LlapMetricsListener on Tez side to disable / resize Daemons (Peter Vary reviewed by Oliver Draese and Adam Szita) --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 5 ++ .../llap/tezplugins/LlapTaskSchedulerService.java | 2 +- .../tezplugins/metrics/LlapMetricsCollector.java | 57 +++++++++++++-- .../tezplugins/metrics/LlapMetricsListener.java | 47 ++++++++++++ .../metrics/TestLlapMetricsCollector.java | 84 ++++++++++++++++++++++ 5 files changed, 187 insertions(+), 8 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 48b49ce..375ceea 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4353,6 +4353,11 @@ public class HiveConf extends Configuration { new TimeValidator(TimeUnit.MILLISECONDS), "Collect llap daemon metrics in the AM every given milliseconds,\n" + "so that the AM can use this information, to make better scheduling decisions.\n" + "If it's set to 0, then the feature is disabled."), + LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER( + "hive.llap.task.scheduler.am.collect.daemon.metrics.listener", "", + "The listener which is called when new Llap Daemon statistics is received on AM side.\n" + + "The listener should implement the " + + "org.apache.hadoop.hive.llap.tezplugins.metrics.LlapMetricsListener interface."), LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME("hive.llap.task.scheduler.am.registry", "llap", "AM registry name for LLAP task scheduler plugin to register with."), LLAP_TASK_SCHEDULER_AM_REGISTRY_PRINCIPAL("hive.llap.task.scheduler.am.registry.principal", "", diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 37e2fcd..a97a934 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -404,7 +404,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { if (HiveConf.getTimeVar(conf, HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS, TimeUnit.MILLISECONDS) > 0) { - this.llapMetricsCollector = new LlapMetricsCollector(conf); + this.llapMetricsCollector = new LlapMetricsCollector(conf, registry); this.registry.registerServiceListener(llapMetricsCollector); } diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java index 99a2521..2ca7ed6 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java @@ -1,7 +1,11 @@ /* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -30,6 +34,7 @@ import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.service.Service; import org.apache.hadoop.service.ServiceStateChangeListener; +import org.apache.logging.log4j.util.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,26 +63,50 @@ public class LlapMetricsCollector implements ServiceStateChangeListener, private final Map<String, LlapManagementProtocolClientImpl> llapClients; private final Map<String, LlapMetrics> instanceStatisticsMap; private final long metricsCollectionMs; + @VisibleForTesting + final LlapMetricsListener listener; - public LlapMetricsCollector(Configuration conf) { + public LlapMetricsCollector(Configuration conf, LlapRegistryService registry) { this( conf, Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setDaemon(true).setNameFormat(THREAD_NAME) .build()), - LlapManagementProtocolClientImplFactory.basicInstance(conf)); + LlapManagementProtocolClientImplFactory.basicInstance(conf), + registry); } @VisibleForTesting LlapMetricsCollector(Configuration conf, ScheduledExecutorService scheduledMetricsExecutor, LlapManagementProtocolClientImplFactory clientFactory) { + this(conf, scheduledMetricsExecutor, clientFactory, null); + } + + @VisibleForTesting + LlapMetricsCollector(Configuration conf, ScheduledExecutorService scheduledMetricsExecutor, + LlapManagementProtocolClientImplFactory clientFactory, + LlapRegistryService registry) { this.scheduledMetricsExecutor = scheduledMetricsExecutor; this.clientFactory = clientFactory; this.llapClients = new HashMap<>(); this.instanceStatisticsMap = new ConcurrentHashMap<>(); this.metricsCollectionMs = HiveConf.getTimeVar(conf, HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS, TimeUnit.MILLISECONDS); + String listenerClass = HiveConf.getVar(conf, + HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER); + if (Strings.isBlank(listenerClass)) { + listener = null; + } else { + try { + listener = (LlapMetricsListener)Class.forName(listenerClass.trim()).newInstance(); + listener.init(conf, registry); + } catch (Exception e) { + throw new IllegalArgumentException("Wrong configuration for " + + HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER + + " " + listenerClass, e); + } + } } public void start() { @@ -101,13 +130,27 @@ public class LlapMetricsCollector implements ServiceStateChangeListener, LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto metrics = client.getDaemonMetrics(null, LlapDaemonProtocolProtos.GetDaemonMetricsRequestProto.newBuilder().build()); - instanceStatisticsMap.put(identity, new LlapMetrics(metrics)); - + LlapMetrics newMetrics = new LlapMetrics(metrics); + instanceStatisticsMap.put(identity, newMetrics); + if (listener != null) { + try { + listener.newDaemonMetrics(identity, newMetrics); + } catch (Throwable t) { + LOG.warn("LlapMetricsListener thrown an unexpected exception", t); + } + } } catch (ServiceException ex) { LOG.error(ex.getMessage(), ex); instanceStatisticsMap.remove(identity); } } + if (listener != null) { + try { + listener.newClusterMetrics(getMetrics()); + } catch (Throwable t) { + LOG.warn("LlapMetricsListener thrown an unexpected exception", t); + } + } } @Override diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsListener.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsListener.java new file mode 100644 index 0000000..446100b --- /dev/null +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsListener.java @@ -0,0 +1,47 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.tezplugins.metrics; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapMetricsCollector.LlapMetrics; + +import java.util.Map; + +/** + * Interface to handle Llap Daemon metrics changes. + */ +public interface LlapMetricsListener { + + /** + * Initializing the listener with the current configuration. + * @param conf The configuration + * @param registry The Llap registry service to access the Llap Daemons + */ + void init(Configuration conf, LlapRegistryService registry); + + /** + * Handler will be called when new Llap Daemon metrics data is arrived. + * @param workerIdentity The worker identity of the Llap Daemon + * @param newMetrics The new metrics object + */ + void newDaemonMetrics(String workerIdentity, LlapMetrics newMetrics); + + /** + * Handler will be called when new data is arrived for every active Llap Daemon in the cluster. + * @param newMetrics The map of the worker indentity -> metrics + */ + void newClusterMetrics(Map<String, LlapMetrics> newMetrics); +} diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java index 6da4d8c..d9e1f71 100644 --- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java +++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java @@ -29,11 +29,15 @@ import org.junit.Test; import org.mockito.Mock; import java.io.IOException; +import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; @@ -73,6 +77,9 @@ public class TestLlapMetricsCollector { when(mockConf.get(HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS.varname, HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS.defaultStrVal)).thenReturn("30000ms"); + when(mockConf.get(HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER.varname, + HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER.defaultStrVal)) + .thenReturn(MockListener.class.getName()); when(mockClientFactory.create(any(LlapServiceInstance.class))).thenReturn(mockClient); when(mockClient.getDaemonMetrics( any(RpcController.class), @@ -220,4 +227,81 @@ public class TestLlapMetricsCollector { .scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); } + + /** + * Check that the listener is created and called. The default config contains the mock listener. + */ + @Test(timeout = DEFAULT_TIMEOUT) + public void testListener() { + // Given + LlapServiceInstance mockService1 = mock(LlapServiceInstance.class); + when(mockService1.getWorkerIdentity()).thenReturn(TEST_IDENTITY_1); + LlapServiceInstance mockService2 = mock(LlapServiceInstance.class); + when(mockService2.getWorkerIdentity()).thenReturn(TEST_IDENTITY_2); + + // When + collector.onCreate(mockService1, TEST_SEQ_VERSION); + collector.onCreate(mockService2, TEST_SEQ_VERSION); + collector.collectMetrics(); + collector.collectMetrics(); + collector.collectMetrics(); + + // Then + assertNotNull(collector.listener); + assertEquals(1, ((MockListener)collector.listener).initCount); + assertEquals(3, ((MockListener)collector.listener).fullMetricsCount); + assertEquals(6, ((MockListener)collector.listener).daemonMetricsCount); + } + + /** + * Check that the collector is working without the listener too. + */ + @Test(timeout = DEFAULT_TIMEOUT) + public void testWithoutListener() { + // Given + when(mockConf.get(HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER.varname, + HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER.defaultStrVal)).thenReturn(""); + collector = new LlapMetricsCollector(mockConf, mockExecutor, mockClientFactory); + + LlapServiceInstance mockService1 = mock(LlapServiceInstance.class); + when(mockService1.getWorkerIdentity()).thenReturn(TEST_IDENTITY_1); + LlapServiceInstance mockService2 = mock(LlapServiceInstance.class); + when(mockService2.getWorkerIdentity()).thenReturn(TEST_IDENTITY_2); + + // Check that there is no exception with start / create / remove / collect + collector.start(); + collector.onCreate(mockService1, TEST_SEQ_VERSION); + collector.onCreate(mockService2, TEST_SEQ_VERSION); + collector.onRemove(mockService2, TEST_SEQ_VERSION); + collector.collectMetrics(); + + // Then + assertNull(collector.listener); + } + + /** + * Just count the calls. + */ + static class MockListener implements LlapMetricsListener { + int initCount = 0; + int daemonMetricsCount = 0; + int fullMetricsCount = 0; + + @Override + public void init(Configuration configuration, LlapRegistryService registry) { + initCount++; + } + + @Override + public void newDaemonMetrics(String workerIdentity, LlapMetricsCollector.LlapMetrics newMetrics) { + assertTrue("Init should be called first", initCount > 0); + daemonMetricsCount++; + } + + @Override + public void newClusterMetrics(Map<String, LlapMetricsCollector.LlapMetrics> newMetrics) { + assertTrue("Init should be called first", initCount > 0); + fullMetricsCount++; + } + } }