This is an automated email from the ASF dual-hosted git repository. payert pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push: new 4fd9938 AMBARI-25549 NegativeArraySizeException thrown when invoking CurrentCollectorHost (#3225) 4fd9938 is described below commit 4fd9938a469398fc270b89d5f84133ccab2a4eb0 Author: payert <35402259+pay...@users.noreply.github.com> AuthorDate: Tue Sep 15 09:40:03 2020 +0200 AMBARI-25549 NegativeArraySizeException thrown when invoking CurrentCollectorHost (#3225) * AMBARI-25549 NegativeArraySizeException thrown when invoking CurrentCollectorHost --- .../sink/timeline/AbstractTimelineMetricsSink.java | 178 ++++++++++++--------- ...etricSinkWriteShardHostnameHashingStrategy.java | 1 + .../timeline/AbstractTimelineMetricSinkTest.java | 67 ++++++++ 3 files changed, 170 insertions(+), 76 deletions(-) diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java index f1511f6..d5586d9 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java @@ -119,6 +119,8 @@ public abstract class AbstractTimelineMetricsSink { // well as timed refresh protected Supplier<String> targetCollectorHostSupplier; + private final CollectorShardSupplier collectorShardSupplier = new CollectorShardSupplier(); + protected final SortedSet<String> allKnownLiveCollectors = new TreeSet<>(); private volatile boolean isInitializedForHA = false; @@ -148,6 +150,20 @@ public abstract class AbstractTimelineMetricsSink { .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL); } + private final class CollectorShardSupplier implements Supplier<String> { + @Override + public String get() { + //shardExpired flag is used to determine if the Supplier.get() is invoked through the + // findPreferredCollectHost method (No need to refresh collector hosts) + // OR + // through Expiry (Refresh needed to pick up dead collectors that might have not become alive). + if (shardExpired) { + refreshCollectorsFromConfigured(getConfiguredCollectorHosts()); + } + return metricSinkWriteShardStrategy.findCollectorShard(new ArrayList<>(allKnownLiveCollectors)); + } + } + public AbstractTimelineMetricsSink() { LOG = LogFactory.getLog(this.getClass()); } @@ -272,18 +288,22 @@ public abstract class AbstractTimelineMetricsSink { protected String getCurrentCollectorHost() { String collectorHost; - // Get cached target - if (targetCollectorHostSupplier != null) { - collectorHost = targetCollectorHostSupplier.get(); - // Last X attempts have failed - force refresh - if (failedCollectorConnectionsCounter.get() > RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER) { - LOG.debug("Removing collector " + collectorHost + " from allKnownLiveCollectors."); - allKnownLiveCollectors.remove(collectorHost); - targetCollectorHostSupplier = null; + /* Using collectorShardSupplier as sync_object since the Supplier.get() is using the same + object under the hood to provide thread safety. */ + synchronized (collectorShardSupplier) { + // Get cached target + if (targetCollectorHostSupplier != null) { + collectorHost = targetCollectorHostSupplier.get(); + // Last X attempts have failed - force refresh + if (failedCollectorConnectionsCounter.get() > RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER) { + LOG.debug("Removing collector " + collectorHost + " from allKnownLiveCollectors."); + allKnownLiveCollectors.remove(collectorHost); + targetCollectorHostSupplier = null; + collectorHost = findPreferredCollectHost(); + } + } else { collectorHost = findPreferredCollectHost(); } - } else { - collectorHost = findPreferredCollectHost(); } if (collectorHost == null) { @@ -360,11 +380,15 @@ public abstract class AbstractTimelineMetricsSink { * * @return the app cookie manager */ - public synchronized AppCookieManager getAppCookieManager() { - if (appCookieManager == null) { - appCookieManager = new AppCookieManager(); + public AppCookieManager getAppCookieManager() { + /* Using collectorShardSupplier as sync_object since the Supplier.get() is using the same + object under the hood to provide thread safety. */ + synchronized (collectorShardSupplier) { + if (appCookieManager == null) { + appCookieManager = new AppCookieManager(); + } + return appCookieManager; } - return appCookieManager; } /** @@ -513,72 +537,59 @@ public abstract class AbstractTimelineMetricsSink { * * @return String Collector hostname */ - protected synchronized String findPreferredCollectHost() { - if (!isInitializedForHA) { - init(); - } - - shardExpired = false; - // Auto expire and re-calculate after 1 hour - if (targetCollectorHostSupplier != null) { - String targetCollector = targetCollectorHostSupplier.get(); - if (targetCollector != null) { - return targetCollector; + protected String findPreferredCollectHost() { + /* Using collectorShardSupplier as sync_object since the Supplier.get() is using the same + object under the hood to provide thread safety. */ + synchronized (collectorShardSupplier) { + if (!isInitializedForHA) { + init(); } - } - // Reach out to all configured collectors before Zookeeper - Collection<String> collectorHosts = getConfiguredCollectorHosts(); - refreshCollectorsFromConfigured(collectorHosts); - - // Lookup Zookeeper for live hosts - max 10 seconds wait time - long currentTime = System.currentTimeMillis(); - if (allKnownLiveCollectors.size() == 0 && getZookeeperQuorum() != null - && (currentTime - lastFailedZkRequestTime) > zookeeperBackoffTimeMillis) { - - LOG.debug("No live collectors from configuration. Requesting zookeeper..."); - allKnownLiveCollectors.addAll(collectorHAHelper.findLiveCollectorHostsFromZNode()); - boolean noNewCollectorFromZk = true; - for (String collectorHostFromZk : allKnownLiveCollectors) { - if (!collectorHosts.contains(collectorHostFromZk)) { - noNewCollectorFromZk = false; - break; + shardExpired = false; + // Auto expire and re-calculate after 1 hour + if (targetCollectorHostSupplier != null) { + String targetCollector = targetCollectorHostSupplier.get(); + if (targetCollector != null) { + return targetCollector; } } - if (noNewCollectorFromZk) { - LOG.debug("No new collector was found from Zookeeper. Will not request zookeeper for " + zookeeperBackoffTimeMillis + " millis"); - lastFailedZkRequestTime = System.currentTimeMillis(); - } - } - if (allKnownLiveCollectors.size() != 0) { - targetCollectorHostSupplier = Suppliers.memoizeWithExpiration( - new Supplier<String>() { - @Override - public String get() { - //shardExpired flag is used to determine if the Supplier.get() is invoked through the - // findPreferredCollectHost method (No need to refresh collector hosts - // OR - // through Expiry (Refresh needed to pick up dead collectors that might have not become alive). - if (shardExpired) { - refreshCollectorsFromConfigured(getConfiguredCollectorHosts()); - } - return metricSinkWriteShardStrategy.findCollectorShard(new ArrayList<>(allKnownLiveCollectors)); + // Reach out to all configured collectors before Zookeeper + Collection<String> collectorHosts = getConfiguredCollectorHosts(); + refreshCollectorsFromConfigured(collectorHosts); + + // Lookup Zookeeper for live hosts - max 10 seconds wait time + long currentTime = System.currentTimeMillis(); + if (allKnownLiveCollectors.size() == 0 && getZookeeperQuorum() != null + && (currentTime - lastFailedZkRequestTime) > zookeeperBackoffTimeMillis) { + + LOG.debug("No live collectors from configuration. Requesting zookeeper..."); + allKnownLiveCollectors.addAll(collectorHAHelper.findLiveCollectorHostsFromZNode()); + boolean noNewCollectorFromZk = true; + for (String collectorHostFromZk : allKnownLiveCollectors) { + if (!collectorHosts.contains(collectorHostFromZk)) { + noNewCollectorFromZk = false; + break; } - }, // random.nextInt(max - min + 1) + min # (60 to 75 minutes) - rand.nextInt(COLLECTOR_HOST_CACHE_MAX_EXPIRATION_MINUTES - - COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES + 1) - + COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES, - TimeUnit.MINUTES - ); - - String collectorHost = targetCollectorHostSupplier.get(); + } + if (noNewCollectorFromZk) { + LOG.debug("No new collector was found from Zookeeper. Will not request zookeeper for " + zookeeperBackoffTimeMillis + " millis"); + lastFailedZkRequestTime = System.currentTimeMillis(); + } + } + + if (allKnownLiveCollectors.size() != 0) { + SupplierExpiry expiry = getSupplierExpiry(); + targetCollectorHostSupplier = Suppliers.memoizeWithExpiration(collectorShardSupplier, expiry.duration, expiry.timeUnit); + + String collectorHost = targetCollectorHostSupplier.get(); + shardExpired = true; + return collectorHost; + } + LOG.debug("Couldn't find any live collectors. Returning null"); shardExpired = true; - return collectorHost; + return null; } - LOG.debug("Couldn't find any live collectors. Returning null"); - shardExpired = true; - return null; } private void refreshCollectorsFromConfigured(Collection<String> collectorHosts) { @@ -591,14 +602,12 @@ public abstract class AbstractTimelineMetricsSink { try { Collection<String> liveHosts = findLiveCollectorHostsFromKnownCollector(hostStr, getCollectorPort()); // Update live Hosts - current host will already be a part of this - for (String host : liveHosts) { - allKnownLiveCollectors.add(host); - } + allKnownLiveCollectors.addAll(liveHosts); break; // Found at least 1 live collector } catch (MetricCollectorUnavailableException e) { LOG.debug("Collector " + hostStr + " is not longer live. Removing " + "it from list of know live collector hosts : " + allKnownLiveCollectors); - allKnownLiveCollectors.remove(hostStr); + boolean res = allKnownLiveCollectors.remove(hostStr); } } } @@ -711,6 +720,23 @@ public abstract class AbstractTimelineMetricsSink { return metricsPostCache; } + class SupplierExpiry { + final long duration; + final TimeUnit timeUnit; + + public SupplierExpiry(long duration, TimeUnit timeUnit) { + this.duration = duration; + this.timeUnit = timeUnit; + } + } + protected SupplierExpiry getSupplierExpiry() { + // random.nextInt(max - min + 1) + min # (60 to 75 minutes) + long duration = rand.nextInt(COLLECTOR_HOST_CACHE_MAX_EXPIRATION_MINUTES + - COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES + 1) + + COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES; + return new SupplierExpiry(duration, TimeUnit.MINUTES); + } + /** * Get a pre-formatted URI for the collector */ diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java index 25bff54..23bf68c 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java @@ -36,6 +36,7 @@ public class MetricSinkWriteShardHostnameHashingStrategy implements MetricSinkWr @Override public String findCollectorShard(List<String> collectorHosts) { + if (collectorHosts.isEmpty()) return null; long index = hostnameHash % collectorHosts.size(); index = index < 0 ? index + collectorHosts.size() : index; String collectorHost = collectorHosts.get((int) index); diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java index f3f8a62..71b9703 100644 --- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java +++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java @@ -30,7 +30,13 @@ import java.net.HttpURLConnection; import java.net.URL; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.easymock.EasyMock.anyString; import static org.easymock.EasyMock.expect; @@ -204,6 +210,41 @@ public class AbstractTimelineMetricSinkTest { verifyAll(); } + @Test + public void testGetCurrentCollectorHostMultiThreaded() { + final int threadPoolSize = 32; + final int numberOfThreads = threadPoolSize * 8; + final AtomicBoolean stop = new AtomicBoolean(false); + final TestTimelineMetricsThreadingSink sink = new TestTimelineMetricsThreadingSink(); + final ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize); + int threadCount = 0; + + try { + while (!stop.get() && threadCount < numberOfThreads) { + executorService.execute(() -> { + try { + String host = sink.getCurrentCollectorHost(); + } catch (Exception e) { + e.printStackTrace(); + stop.set(true); + executorService.shutdownNow(); + } + }); + + threadCount++; + } + + executorService.awaitTermination(30, TimeUnit.SECONDS); + if(stop.get()) { + Assert.fail("Unexpected exception(s) has been thrown! See the stack traces above!"); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + executorService.shutdown(); + } + } + private class TestTimelineMetricsSink extends AbstractTimelineMetricsSink { @Override protected String getCollectorUri(String host) { @@ -254,5 +295,31 @@ public class AbstractTimelineMetricSinkTest { protected String getHostInMemoryAggregationProtocol() { return "http"; } + + } + + private class TestTimelineMetricsThreadingSink extends TestTimelineMetricsSink { + private final AtomicInteger counter = new AtomicInteger(0); + private final Collection<String> hosts = Collections.singletonList("host1"); + + @Override + protected Collection<String> findLiveCollectorHostsFromKnownCollector(String host, String port) throws MetricCollectorUnavailableException { + failedCollectorConnectionsCounter.set(4); + int c = counter.getAndIncrement(); + if (c % 2 == 0 || c % 3 == 0) { + throw new MetricCollectorUnavailableException("MetricCollectorUnavailable"); + } + return hosts; + } + + @Override + protected Collection<String> getConfiguredCollectorHosts() { + return hosts; + } + + @Override + protected SupplierExpiry getSupplierExpiry() { + return new SupplierExpiry(128, TimeUnit.MILLISECONDS); + } } }