Add retry in cache sync Signed-off-by: shaofengshi <shaofeng...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ecc01458 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ecc01458 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ecc01458 Branch: refs/heads/ranger Commit: ecc01458c4ad361aaf863505884d51474a8fec9d Parents: 32d9d23 Author: lichao <cha...@mobvoi.com> Authored: Sun Sep 10 03:03:23 2017 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Mon Sep 11 11:09:29 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 6 ++++- .../main/resources/kylin-defaults.properties | 3 +++ .../kylin/metadata/cachesync/Broadcaster.java | 27 +++++++++++++++++++- .../kylin/rest/service/CacheServiceTest.java | 4 +++ 4 files changed, 38 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/ecc01458/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index ff76be2..7d20648 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -253,6 +253,10 @@ abstract public class KylinConfigBase implements Serializable { return StorageURL.valueOf(getOptional("kylin.metadata.url", "kylin_metadata@hbase")); } + public int getCacheSyncRetrys() { + return Integer.parseInt(getOptional("kylin.metadata.sync.retries", "3")); + } + // for test only public void setMetadataUrl(String metadataUrl) { setProperty("kylin.metadata.url", metadataUrl); @@ -346,7 +350,7 @@ abstract public class KylinConfigBase implements Serializable { public String getSegmentAdvisor() { return getOptional("kylin.cube.segment-advisor", "org.apache.kylin.cube.CubeSegmentAdvisor"); } - + public double getJobCuboidSizeRatio() { return Double.parseDouble(getOptional("kylin.cube.size-estimate-ratio", "0.25")); } http://git-wip-us.apache.org/repos/asf/kylin/blob/ecc01458/core-common/src/main/resources/kylin-defaults.properties ---------------------------------------------------------------------- diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties index cf0d226..31ed60e 100644 --- a/core-common/src/main/resources/kylin-defaults.properties +++ b/core-common/src/main/resources/kylin-defaults.properties @@ -20,6 +20,9 @@ # The metadata store in hbase kylin.metadata.url=kylin_metadata@hbase +# metadata cache sync retry times +kylin.metadata.sync.retries=3 + # Working folder in HDFS, better be qualified absolute path, make sure user has the right permission to this directory kylin.env.hdfs-working-dir=/kylin http://git-wip-us.apache.org/repos/asf/kylin/blob/ecc01458/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java index 00b8857..532ae74 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java @@ -112,6 +112,7 @@ public class Broadcaster { private Broadcaster(final KylinConfig config) { this.config = config; + final int retryLimitTimes = config.getCacheSyncRetrys(); final String[] nodes = config.getRestServers(); if (nodes == null || nodes.length < 1) { @@ -129,6 +130,12 @@ public class Broadcaster { while (true) { try { final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst(); + broadcastEvent.setRetryTime(broadcastEvent.getRetryTime() + 1); + if (broadcastEvent.getRetryTime() > retryLimitTimes) { + logger.info("broadcastEvent retry up to limit times, broadcastEvent:{}", broadcastEvent); + continue; + } + String[] restServers = config.getRestServers(); logger.debug("Servers in the cluster: " + Arrays.toString(restServers)); for (final String node : restServers) { @@ -146,7 +153,16 @@ public class Broadcaster { restClientMap.get(node).wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey()); } catch (IOException e) { - logger.warn("Thread failed during wipe cache at " + broadcastEvent, e); + logger.warn("Thread failed during wipe cache at {}, error msg: {}", + broadcastEvent, e); + // when sync failed, put back to queue + try { + broadcastEvents.putLast(broadcastEvent); + } catch (InterruptedException ex) { + logger.warn( + "error reentry failed broadcastEvent to queue, broacastEvent:{}, error: {} ", + broadcastEvent, ex); + } } } }); @@ -322,6 +338,7 @@ public class Broadcaster { } public static class BroadcastEvent { + private int retryTime; private String entity; private String event; private String cacheKey; @@ -333,6 +350,14 @@ public class Broadcaster { this.cacheKey = cacheKey; } + public int getRetryTime() { + return retryTime; + } + + public void setRetryTime(int retryTime) { + this.retryTime = retryTime; + } + public String getEntity() { return entity; } http://git-wip-us.apache.org/repos/asf/kylin/blob/ecc01458/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java index 704e45d..ccc8edc 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java +++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java @@ -155,6 +155,10 @@ public class CacheServiceTest extends LocalFileMetadataTestCase { private void waitForCounterAndClear(long count) { int retryTimes = 0; while ((!counter.compareAndSet(count, 0L))) { + // take into account wipe retry causing counter larger than count + if (counter.get() > count) { + counter.decrementAndGet(); + } if (++retryTimes > 30) { throw new RuntimeException("timeout"); }