bug fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/487c0af0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/487c0af0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/487c0af0 Branch: refs/heads/0.8.0 Commit: 487c0af027ade5603fe7a951cb4ff2f80c5aa12e Parents: 6d6a7c1 Author: honma <ho...@ebay.com> Authored: Tue Jun 2 18:23:11 2015 +0800 Committer: honma <ho...@ebay.com> Committed: Tue Jun 2 22:25:55 2015 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/job/streaming/CubeStreamBuilder.java | 4 ++-- .../org/apache/kylin/job/streaming/StreamingBootstrap.java | 6 +++++- .../main/java/org/apache/kylin/streaming/StreamBuilder.java | 9 +++++++-- 3 files changed, 14 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/487c0af0/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java index 2831caa..3c98464 100644 --- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java +++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java @@ -370,11 +370,11 @@ public class CubeStreamBuilder extends StreamBuilder { @Override protected int batchInterval() { - return 5 * 60 * 1000;//30 min + return 5 * 60 * 1000;//5 min } @Override protected int batchSize() { - return 1000; + return 10000; } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/487c0af0/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java index ee00880..dcfa774 100644 --- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java +++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java @@ -130,7 +130,7 @@ public class StreamingBootstrap { private List<BlockingQueue<StreamMessage>> consume(KafkaConfig kafkaConfig, final int partitionCount) { List<BlockingQueue<StreamMessage>> result = Lists.newArrayList(); - for (int partitionId = 0; partitionId < partitionCount && partitionId < 3; ++partitionId) { + for (int partitionId = 0; partitionId < partitionCount; ++partitionId) { final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId); final long latestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaConfig); @@ -153,10 +153,14 @@ public class StreamingBootstrap { Executors.newSingleThreadExecutor().execute(new Runnable() { @Override public void run() { + int totalMessage = 0; while (true) { for (BlockingQueue<StreamMessage> queue : queues) { try { streamQueue.put(queue.take()); + if (totalMessage++ % 10000 == 1) { + logger.info("Total stream message count: " + totalMessage); + } } catch (InterruptedException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/487c0af0/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java index cb5dc1d..3008722 100644 --- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java +++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java @@ -35,7 +35,6 @@ package org.apache.kylin.streaming; import com.google.common.collect.Lists; -import org.apache.commons.collections.CollectionUtils; import org.apache.kylin.common.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +49,6 @@ public abstract class StreamBuilder implements Runnable { private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class); - private StreamParser streamParser = StringStreamParser.instance; private StreamFilter streamFilter = DefaultStreamFilter.instance; @@ -84,6 +82,7 @@ public abstract class StreamBuilder implements Runnable { public void run() { try { List<List<String>> parsedStreamMessages = null; + int filteredMsgCount = 0; while (true) { if (parsedStreamMessages == null) { parsedStreamMessages = Lists.newLinkedList(); @@ -113,6 +112,11 @@ public abstract class StreamBuilder implements Runnable { final ParsedStreamMessage parsedStreamMessage = getStreamParser().parse(streamMessage); if (getStreamFilter().filter(parsedStreamMessage)) { + + if (filteredMsgCount++ % 10000 == 1) { + logger.info("Total stream message count: " + filteredMsgCount); + } + if (startOffset > parsedStreamMessage.getOffset()) { startOffset = parsedStreamMessage.getOffset(); } @@ -158,5 +162,6 @@ public abstract class StreamBuilder implements Runnable { } protected abstract int batchInterval(); + protected abstract int batchSize(); }