This is an automated email from the ASF dual-hosted git repository. wuzhiguo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ambari-metrics.git
The following commit(s) were added to refs/heads/master by this push: new dd01477 AMBARI-25549: NegativeArraySizeException thrown when invoking CurrentCollectorHost (#57) dd01477 is described below commit dd0147782f26a6a1ba4ce13181a2ee3d95320afa Author: lucasbak <lucas.bakal...@gmail.com> AuthorDate: Wed Nov 16 09:38:18 2022 +0100 AMBARI-25549: NegativeArraySizeException thrown when invoking CurrentCollectorHost (#57) --- .../sink/timeline/AbstractTimelineMetricsSink.java | 178 ++++++++++++--------- ...etricSinkWriteShardHostnameHashingStrategy.java | 5 +- .../timeline/AbstractTimelineMetricSinkTest.java | 66 ++++++++ 3 files changed, 169 insertions(+), 80 deletions(-) diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java index d77f60c..ee036ca 100644 --- a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java +++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java @@ -119,6 +119,8 @@ public abstract class AbstractTimelineMetricsSink { // well as timed refresh protected Supplier<String> targetCollectorHostSupplier; + private final CollectorShardSupplier collectorShardSupplier = new CollectorShardSupplier(); + protected final SortedSet<String> allKnownLiveCollectors = new TreeSet<>(); private volatile boolean isInitializedForHA = false; @@ -148,6 +150,20 @@ public abstract class AbstractTimelineMetricsSink { .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL); } + private final class CollectorShardSupplier implements Supplier<String> { + @Override + public String get() { + //shardExpired flag is used to determine if the Supplier.get() is invoked through the + // findPreferredCollectHost method (No need to refresh collector hosts) + // OR + // through Expiry (Refresh needed to pick up dead collectors that might have not become alive). + if (shardExpired) { + refreshCollectorsFromConfigured(getConfiguredCollectorHosts()); + } + return metricSinkWriteShardStrategy.findCollectorShard(new ArrayList<>(allKnownLiveCollectors)); + } + } + public AbstractTimelineMetricsSink() { LOG = LogFactory.getLog(this.getClass()); } @@ -272,18 +288,22 @@ public abstract class AbstractTimelineMetricsSink { protected String getCurrentCollectorHost() { String collectorHost; - // Get cached target - if (targetCollectorHostSupplier != null) { - collectorHost = targetCollectorHostSupplier.get(); - // Last X attempts have failed - force refresh - if (failedCollectorConnectionsCounter.get() > RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER) { - LOG.debug("Removing collector " + collectorHost + " from allKnownLiveCollectors."); - allKnownLiveCollectors.remove(collectorHost); - targetCollectorHostSupplier = null; + /* Using collectorShardSupplier as sync_object since the Supplier.get() is using the same + object under the hood to provide thread safety. */ + synchronized (collectorShardSupplier) { + // Get cached target + if (targetCollectorHostSupplier != null) { + collectorHost = targetCollectorHostSupplier.get(); + // Last X attempts have failed - force refresh + if (failedCollectorConnectionsCounter.get() > RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER) { + LOG.debug("Removing collector " + collectorHost + " from allKnownLiveCollectors."); + allKnownLiveCollectors.remove(collectorHost); + targetCollectorHostSupplier = null; + collectorHost = findPreferredCollectHost(); + } + } else { collectorHost = findPreferredCollectHost(); } - } else { - collectorHost = findPreferredCollectHost(); } if (collectorHost == null) { @@ -360,11 +380,15 @@ public abstract class AbstractTimelineMetricsSink { * * @return the app cookie manager */ - public synchronized AppCookieManager getAppCookieManager() { - if (appCookieManager == null) { - appCookieManager = new AppCookieManager(); + public AppCookieManager getAppCookieManager() { + /* Using collectorShardSupplier as sync_object since the Supplier.get() is using the same + object under the hood to provide thread safety. */ + synchronized (collectorShardSupplier) { + if (appCookieManager == null) { + appCookieManager = new AppCookieManager(); + } + return appCookieManager; } - return appCookieManager; } /** @@ -513,72 +537,59 @@ public abstract class AbstractTimelineMetricsSink { * * @return String Collector hostname */ - protected synchronized String findPreferredCollectHost() { - if (!isInitializedForHA) { - init(); - } - - shardExpired = false; - // Auto expire and re-calculate after 1 hour - if (targetCollectorHostSupplier != null) { - String targetCollector = targetCollectorHostSupplier.get(); - if (targetCollector != null) { - return targetCollector; + protected String findPreferredCollectHost() { + /* Using collectorShardSupplier as sync_object since the Supplier.get() is using the same + object under the hood to provide thread safety. */ + synchronized (collectorShardSupplier) { + if (!isInitializedForHA) { + init(); } - } - // Reach out to all configured collectors before Zookeeper - Collection<String> collectorHosts = getConfiguredCollectorHosts(); - refreshCollectorsFromConfigured(collectorHosts); - - // Lookup Zookeeper for live hosts - max 10 seconds wait time - long currentTime = System.currentTimeMillis(); - if (allKnownLiveCollectors.size() == 0 && getZookeeperQuorum() != null - && (currentTime - lastFailedZkRequestTime) > zookeeperBackoffTimeMillis) { - - LOG.debug("No live collectors from configuration. Requesting zookeeper..."); - allKnownLiveCollectors.addAll(collectorHAHelper.findLiveCollectorHostsFromZNode()); - boolean noNewCollectorFromZk = true; - for (String collectorHostFromZk : allKnownLiveCollectors) { - if (!collectorHosts.contains(collectorHostFromZk)) { - noNewCollectorFromZk = false; - break; + shardExpired = false; + // Auto expire and re-calculate after 1 hour + if (targetCollectorHostSupplier != null) { + String targetCollector = targetCollectorHostSupplier.get(); + if (targetCollector != null) { + return targetCollector; } } - if (noNewCollectorFromZk) { - LOG.debug("No new collector was found from Zookeeper. Will not request zookeeper for " + zookeeperBackoffTimeMillis + " millis"); - lastFailedZkRequestTime = System.currentTimeMillis(); - } - } - if (allKnownLiveCollectors.size() != 0) { - targetCollectorHostSupplier = Suppliers.memoizeWithExpiration( - new Supplier<String>() { - @Override - public String get() { - //shardExpired flag is used to determine if the Supplier.get() is invoked through the - // findPreferredCollectHost method (No need to refresh collector hosts - // OR - // through Expiry (Refresh needed to pick up dead collectors that might have not become alive). - if (shardExpired) { - refreshCollectorsFromConfigured(getConfiguredCollectorHosts()); - } - return metricSinkWriteShardStrategy.findCollectorShard(new ArrayList<>(allKnownLiveCollectors)); + // Reach out to all configured collectors before Zookeeper + Collection<String> collectorHosts = getConfiguredCollectorHosts(); + refreshCollectorsFromConfigured(collectorHosts); + + // Lookup Zookeeper for live hosts - max 10 seconds wait time + long currentTime = System.currentTimeMillis(); + if (allKnownLiveCollectors.size() == 0 && getZookeeperQuorum() != null + && (currentTime - lastFailedZkRequestTime) > zookeeperBackoffTimeMillis) { + + LOG.debug("No live collectors from configuration. Requesting zookeeper..."); + allKnownLiveCollectors.addAll(collectorHAHelper.findLiveCollectorHostsFromZNode()); + boolean noNewCollectorFromZk = true; + for (String collectorHostFromZk : allKnownLiveCollectors) { + if (!collectorHosts.contains(collectorHostFromZk)) { + noNewCollectorFromZk = false; + break; } - }, // random.nextInt(max - min + 1) + min # (60 to 75 minutes) - rand.nextInt(COLLECTOR_HOST_CACHE_MAX_EXPIRATION_MINUTES - - COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES + 1) - + COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES, - TimeUnit.MINUTES - ); - - String collectorHost = targetCollectorHostSupplier.get(); + } + if (noNewCollectorFromZk) { + LOG.debug("No new collector was found from Zookeeper. Will not request zookeeper for " + zookeeperBackoffTimeMillis + " millis"); + lastFailedZkRequestTime = System.currentTimeMillis(); + } + } + + if (allKnownLiveCollectors.size() != 0) { + SupplierExpiry expiry = getSupplierExpiry(); + targetCollectorHostSupplier = Suppliers.memoizeWithExpiration(collectorShardSupplier, expiry.duration, expiry.timeUnit); + + String collectorHost = targetCollectorHostSupplier.get(); + shardExpired = true; + return collectorHost; + } + LOG.debug("Couldn't find any live collectors. Returning null"); shardExpired = true; - return collectorHost; + return null; } - LOG.debug("Couldn't find any live collectors. Returning null"); - shardExpired = true; - return null; } private void refreshCollectorsFromConfigured(Collection<String> collectorHosts) { @@ -591,14 +602,12 @@ public abstract class AbstractTimelineMetricsSink { try { Collection<String> liveHosts = findLiveCollectorHostsFromKnownCollector(hostStr, getCollectorPort()); // Update live Hosts - current host will already be a part of this - for (String host : liveHosts) { - allKnownLiveCollectors.add(host); - } + allKnownLiveCollectors.addAll(liveHosts); break; // Found at least 1 live collector } catch (MetricCollectorUnavailableException e) { LOG.debug("Collector " + hostStr + " is not longer live. Removing " + "it from list of know live collector hosts : " + allKnownLiveCollectors); - allKnownLiveCollectors.remove(hostStr); + boolean res = allKnownLiveCollectors.remove(hostStr); } } } @@ -711,6 +720,23 @@ public abstract class AbstractTimelineMetricsSink { return metricsPostCache; } + class SupplierExpiry { + final long duration; + final TimeUnit timeUnit; + + public SupplierExpiry(long duration, TimeUnit timeUnit) { + this.duration = duration; + this.timeUnit = timeUnit; + } + } + protected SupplierExpiry getSupplierExpiry() { + // random.nextInt(max - min + 1) + min # (60 to 75 minutes) + long duration = rand.nextInt(COLLECTOR_HOST_CACHE_MAX_EXPIRATION_MINUTES + - COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES + 1) + + COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES; + return new SupplierExpiry(duration, TimeUnit.MINUTES); + } + /** * Get a pre-formatted URI for the collector */ diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java index c79bbfb..23bf68c 100644 --- a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java +++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java @@ -36,10 +36,7 @@ public class MetricSinkWriteShardHostnameHashingStrategy implements MetricSinkWr @Override public String findCollectorShard(List<String> collectorHosts) { - if(collectorHosts.size() ==0) { - LOG.warn("No metrics collectors alive, please check collector status!"); - return ""; - } + if (collectorHosts.isEmpty()) return null; long index = hostnameHash % collectorHosts.size(); index = index < 0 ? index + collectorHosts.size() : index; String collectorHost = collectorHosts.get((int) index); diff --git a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java index f3f8a62..d494a8e 100644 --- a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java +++ b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java @@ -30,7 +30,13 @@ import java.net.HttpURLConnection; import java.net.URL; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.easymock.EasyMock.anyString; import static org.easymock.EasyMock.expect; @@ -204,6 +210,41 @@ public class AbstractTimelineMetricSinkTest { verifyAll(); } + @Test + public void testGetCurrentCollectorHostMultiThreaded() { + final int threadPoolSize = 32; + final int numberOfThreads = threadPoolSize * 8; + final AtomicBoolean stop = new AtomicBoolean(false); + final TestTimelineMetricsThreadingSink sink = new TestTimelineMetricsThreadingSink(); + final ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize); + int threadCount = 0; + + try { + while (!stop.get() && threadCount < numberOfThreads) { + executorService.execute(() -> { + try { + String host = sink.getCurrentCollectorHost(); + } catch (Exception e) { + e.printStackTrace(); + stop.set(true); + executorService.shutdownNow(); + } + }); + + threadCount++; + } + + executorService.awaitTermination(30, TimeUnit.SECONDS); + if(stop.get()) { + Assert.fail("Unexpected exception(s) has been thrown! See the stack traces above!"); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + executorService.shutdown(); + } + } + private class TestTimelineMetricsSink extends AbstractTimelineMetricsSink { @Override protected String getCollectorUri(String host) { @@ -255,4 +296,29 @@ public class AbstractTimelineMetricSinkTest { return "http"; } } + + private class TestTimelineMetricsThreadingSink extends TestTimelineMetricsSink { + private final AtomicInteger counter = new AtomicInteger(0); + private final Collection<String> hosts = Collections.singletonList("host1"); + + @Override + protected Collection<String> findLiveCollectorHostsFromKnownCollector(String host, String port) throws MetricCollectorUnavailableException { + failedCollectorConnectionsCounter.set(4); + int c = counter.getAndIncrement(); + if (c % 2 == 0 || c % 3 == 0) { + throw new MetricCollectorUnavailableException("MetricCollectorUnavailable"); + } + return hosts; + } + + @Override + protected Collection<String> getConfiguredCollectorHosts() { + return hosts; + } + + @Override + protected SupplierExpiry getSupplierExpiry() { + return new SupplierExpiry(128, TimeUnit.MILLISECONDS); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@ambari.apache.org For additional commands, e-mail: commits-h...@ambari.apache.org