fix BuildCubeWithStream test case may lose record
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/530549cf Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/530549cf Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/530549cf Branch: refs/heads/master-hbase0.98 Commit: 530549cf92c0f20d3247da563ca40ad4962c8ba4 Parents: 77fe581 Author: shaofengshi <shaofeng...@apache.org> Authored: Sun Mar 12 11:09:07 2017 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Sun Mar 12 11:13:23 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/provision/BuildCubeWithStream.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/530549cf/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index f3b1ec9..2880bf4 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -93,7 +93,8 @@ public class BuildCubeWithStream { private final String kafkaZkPath = "/" + UUID.randomUUID().toString(); protected static boolean fastBuildMode = false; - private boolean generateData = true; + private volatile boolean generateData = true; + private volatile boolean generateDataDone = false; private static final int BUILD_ROUND = 5; @@ -182,11 +183,12 @@ public class BuildCubeWithStream { try { generateStreamData(dateStart, dateEnd, rand.nextInt(100)); dateStart = dateEnd; - sleep(rand.nextInt(rand.nextInt(100 * 1000))); // wait random time + sleep(rand.nextInt(rand.nextInt(50)) * 1000); // wait random time } catch (Exception e) { e.printStackTrace(); } } + generateDataDone = true; } }).start(); ExecutorService executorService = Executors.newCachedThreadPool(); @@ -197,7 +199,14 @@ public class BuildCubeWithStream { // stop generating message to kafka generateData = false; } - Thread.sleep(1 * 60 * 1000); // wait for new messages + int waittime = 0; + while (generateDataDone == false && waittime < 100) { + Thread.sleep(1000); + waittime++; + } + if (generateDataDone == false) { + throw new IllegalStateException("Timeout when wait all messages be sent to Kafka"); // ensure all messages have been flushed. + } FutureTask futureTask = new FutureTask(new Callable<ExecutableState>() { @Override public ExecutableState call() {