Repository: kylin Updated Branches: refs/heads/2.x-staging 0ec3ed0e8 -> 9021f17d8
initial commit for KYLIN-1431 Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/9021f17d Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/9021f17d Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/9021f17d Branch: refs/heads/2.x-staging Commit: 9021f17d85be01bf34b48a7a31be82f53ceb9c8f Parents: 0ec3ed0 Author: shaofengshi <shaofeng...@apache.org> Authored: Wed Mar 2 11:16:46 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Tue Mar 8 15:07:04 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/job/CubeMetaExtractor.java | 2 +- build/bin/streaming_build.sh | 4 +- build/bin/streaming_fillgap.sh | 5 +- .../kylin/metadata/model/ISourceAware.java | 1 + .../kylin/engine/streaming/BootstrapConfig.java | 20 +- .../kylin/engine/streaming/IStreamingInput.java | 3 +- .../streaming/OneOffStreamingBuilder.java | 17 +- .../kylin/engine/streaming/StreamingConfig.java | 33 +-- .../engine/streaming/StreamingManager.java | 12 + .../engine/streaming/cli/StreamingCLI.java | 21 +- .../engine/streaming/util/StreamingUtils.java | 18 +- .../kafka/default.streaming_table.json | 21 ++ .../localmeta/kafka/kafka_test.json | 20 -- .../kafka/test_streaming_table_cube.json | 22 -- .../kafka/test_streaming_table_ii.json | 22 -- .../streaming/default.streaming_table.json | 6 + .../localmeta/streaming/kafka_test.json | 20 -- .../streaming/test_streaming_table_cube.json | 8 - .../streaming/test_streaming_table_ii.json | 8 - .../kylin/provision/BuildCubeWithStream.java | 16 +- .../kylin/rest/controller/CubeController.java | 234 ------------------- .../rest/controller/StreamingController.java | 4 +- .../kylin/rest/service/StreamingService.java | 18 +- .../kylin/source/kafka/KafkaStreamingInput.java | 78 ++++--- .../kylin/source/kafka/StreamingParser.java | 6 +- 25 files changed, 163 insertions(+), 456 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java b/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java index 527ef0a..ef27ade 100644 --- a/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java +++ b/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java @@ -227,7 +227,7 @@ public class CubeMetaExtractor extends AbstractApplication { private void dealWithStreaming(CubeInstance cube) { for (StreamingConfig streamingConfig : streamingManager.listAllStreaming()) { - if (streamingConfig.getCubeName() != null && streamingConfig.getCubeName().equalsIgnoreCase(cube.getName())) { + if (streamingConfig.getName() != null && streamingConfig.getName().equalsIgnoreCase(cube.getFactTable())) { requiredResources.add(StreamingConfig.concatResourcePath(streamingConfig.getName())); requiredResources.add(KafkaConfig.concatResourcePath(streamingConfig.getName())); } http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/build/bin/streaming_build.sh ---------------------------------------------------------------------- diff --git a/build/bin/streaming_build.sh b/build/bin/streaming_build.sh index a96ecc1..ed19036 100644 --- a/build/bin/streaming_build.sh +++ b/build/bin/streaming_build.sh @@ -20,7 +20,7 @@ source /etc/profile source ~/.bash_profile -STREAMING=$1 +CUBE=$1 INTERVAL=$2 DELAY=$3 CURRENT_TIME_IN_SECOND=`date +%s` @@ -30,4 +30,4 @@ END=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY + INTERVAL)) ID="$START"_"$END" echo "building for ${ID}" >> ${KYLIN_HOME}/logs/build_trace.log -sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${STREAMING} ${ID} -start ${START} -end ${END} -streaming ${STREAMING} \ No newline at end of file +sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${CUBE} ${ID} -start ${START} -end ${END} -cube ${CUBE} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/build/bin/streaming_fillgap.sh ---------------------------------------------------------------------- diff --git a/build/bin/streaming_fillgap.sh b/build/bin/streaming_fillgap.sh index 74d9037..c67809a 100644 --- a/build/bin/streaming_fillgap.sh +++ b/build/bin/streaming_fillgap.sh @@ -20,8 +20,7 @@ source /etc/profile source ~/.bash_profile -streaming=$1 -margin=$2 +cube=$1 cd ${KYLIN_HOME} -sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${streaming} fillgap -streaming ${streaming} -fillGap true -margin ${margin} \ No newline at end of file +sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${cube} fillgap -cube ${cube} -fillGap true \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java index 3d89f40..8cfda15 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java @@ -21,6 +21,7 @@ package org.apache.kylin.metadata.model; public interface ISourceAware { public static final int ID_HIVE = 0; + public static final int ID_STREAMING = 1; public static final int ID_SPARKSQL = 5; int getSourceType(); http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java index a3e2db5..a4c4618 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java @@ -4,9 +4,7 @@ package org.apache.kylin.engine.streaming; */ public class BootstrapConfig { - private String streaming; - private int partitionId = -1; - + private String cubeName; private long start = 0L; private long end = 0L; @@ -28,20 +26,12 @@ public class BootstrapConfig { this.end = end; } - public String getStreaming() { - return streaming; - } - - public void setStreaming(String streaming) { - this.streaming = streaming; - } - - public int getPartitionId() { - return partitionId; + public String getCubeName() { + return cubeName; } - public void setPartitionId(int partitionId) { - this.partitionId = partitionId; + public void setCubeName(String cubeName) { + this.cubeName = cubeName; } public boolean isFillGap() { http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java index 1cf3d98..4b4cf02 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java @@ -34,11 +34,12 @@ package org.apache.kylin.engine.streaming; import org.apache.kylin.common.util.StreamingBatch; +import org.apache.kylin.metadata.realization.RealizationType; /** */ public interface IStreamingInput { - StreamingBatch getBatchWithTimeWindow(String streamingConfig, int id, long startTime, long endTime); + StreamingBatch getBatchWithTimeWindow(RealizationType realizationType, String realizationName, int id, long startTime, long endTime); } http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java index 3fbade2..6bad000 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java @@ -43,6 +43,7 @@ import org.apache.kylin.metadata.model.IBuildable; import org.apache.kylin.metadata.model.TblColRef; import com.google.common.base.Preconditions; +import org.apache.kylin.metadata.realization.RealizationType; /** */ @@ -53,23 +54,25 @@ public class OneOffStreamingBuilder { private final StreamingBatchBuilder streamingBatchBuilder; private final long startTime; private final long endTime; - private final String streamingConfig; + private final RealizationType realizationType; + private final String realizationName; - public OneOffStreamingBuilder(String streamingConfig, long startTime, long endTime) { + public OneOffStreamingBuilder(RealizationType realizationType, String realizationName, long startTime, long endTime) { Preconditions.checkArgument(startTime < endTime); this.startTime = startTime; this.endTime = endTime; - this.streamingConfig = Preconditions.checkNotNull(streamingConfig); - this.streamingInput = Preconditions.checkNotNull(StreamingUtils.getStreamingInput(streamingConfig)); - this.streamingOutput = Preconditions.checkNotNull(StreamingUtils.getStreamingOutput(streamingConfig)); - this.streamingBatchBuilder = Preconditions.checkNotNull(StreamingUtils.getMicroBatchBuilder(streamingConfig)); + this.realizationType = Preconditions.checkNotNull(realizationType); + this.realizationName = Preconditions.checkNotNull(realizationName); + this.streamingInput = Preconditions.checkNotNull(StreamingUtils.getStreamingInput()); + this.streamingOutput = Preconditions.checkNotNull(StreamingUtils.getStreamingOutput()); + this.streamingBatchBuilder = Preconditions.checkNotNull(StreamingUtils.getMicroBatchBuilder(realizationType, realizationName)); } public Runnable build() { return new Runnable() { @Override public void run() { - StreamingBatch streamingBatch = streamingInput.getBatchWithTimeWindow(streamingConfig, -1, startTime, endTime); + StreamingBatch streamingBatch = streamingInput.getBatchWithTimeWindow(realizationType, realizationName, -1, startTime, endTime); final IBuildable buildable = streamingBatchBuilder.createBuildable(streamingBatch); final Map<Long, HyperLogLogPlusCounter> samplingResult = streamingBatchBuilder.sampling(streamingBatch); final Map<TblColRef, Dictionary<String>> dictionaryMap = streamingBatchBuilder.buildDictionary(streamingBatch, buildable); http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java index f0a7ab1..c8d1911 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java @@ -55,29 +55,24 @@ public class StreamingConfig extends RootPersistentEntity { public static Serializer<StreamingConfig> SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class); + public static final String STREAMING_TYPE_KAFKA = "kafka"; + @JsonProperty("name") private String name; - @JsonProperty("iiName") - private String iiName; - - @JsonProperty("cubeName") - private String cubeName; + @JsonProperty("type") + private String type = STREAMING_TYPE_KAFKA; - public String getCubeName() { - return cubeName; + public String getType() { + return type; } - public void setCubeName(String cubeName) { - this.cubeName = cubeName; + public void setType(String type) { + this.type = type; } - public String getIiName() { - return iiName; - } - - public void setIiName(String iiName) { - this.iiName = iiName; + public String getResourcePath() { + return concatResourcePath(name); } public String getName() { @@ -88,12 +83,8 @@ public class StreamingConfig extends RootPersistentEntity { this.name = name; } - public String getResourcePath() { - return concatResourcePath(name); - } - - public static String concatResourcePath(String streamingName) { - return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + streamingName + ".json"; + public static String concatResourcePath(String name) { + return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json"; } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java index e0b086d..f652762 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java @@ -104,6 +104,18 @@ public class StreamingManager { } } + private static String formatStreamingConfigPath(String name) { + return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json"; + } + + private static String formatStreamingOutputPath(String streaming, int partition) { + return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition + ".json"; + } + + private static String formatStreamingOutputPath(String streaming, List<Integer> partitions) { + return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json"; + } + public StreamingConfig getStreamingConfig(String name) { return streamingMap.get(name); } http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java index a73a6ac..0bab396 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java @@ -44,6 +44,7 @@ import org.apache.kylin.engine.streaming.OneOffStreamingBuilder; import org.apache.kylin.engine.streaming.StreamingConfig; import org.apache.kylin.engine.streaming.StreamingManager; import org.apache.kylin.engine.streaming.monitor.StreamingMonitor; +import org.apache.kylin.metadata.realization.RealizationType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +56,7 @@ public class StreamingCLI { public static void main(String[] args) { try { - Preconditions.checkArgument(args[0].equals("streaming")); + Preconditions.checkArgument(args[0].equals("cube")); Preconditions.checkArgument(args[1].equals("start")); int i = 2; @@ -69,11 +70,8 @@ public class StreamingCLI { case "-end": bootstrapConfig.setEnd(Long.parseLong(args[++i])); break; - case "-streaming": - bootstrapConfig.setStreaming(args[++i]); - break; - case "-partition": - bootstrapConfig.setPartitionId(Integer.parseInt(args[++i])); + case "-cube": + bootstrapConfig.setCubeName(args[++i]); break; case "-fillGap": bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i])); @@ -84,14 +82,13 @@ public class StreamingCLI { i++; } if (bootstrapConfig.isFillGap()) { - final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(bootstrapConfig.getStreaming()); - final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName()); + final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(bootstrapConfig.getCubeName()); logger.info("all gaps:" + StringUtils.join(gaps, ",")); for (Pair<Long, Long> gap : gaps) { - startOneOffCubeStreaming(bootstrapConfig.getStreaming(), gap.getFirst(), gap.getSecond()); + startOneOffCubeStreaming(bootstrapConfig.getCubeName(), gap.getFirst(), gap.getSecond()); } } else { - startOneOffCubeStreaming(bootstrapConfig.getStreaming(), bootstrapConfig.getStart(), bootstrapConfig.getEnd()); + startOneOffCubeStreaming(bootstrapConfig.getCubeName(), bootstrapConfig.getStart(), bootstrapConfig.getEnd()); logger.info("streaming process finished, exit with 0"); System.exit(0); } @@ -102,8 +99,8 @@ public class StreamingCLI { } } - private static void startOneOffCubeStreaming(String streaming, long start, long end) { - final Runnable runnable = new OneOffStreamingBuilder(streaming, start, end).build(); + private static void startOneOffCubeStreaming(String cubeName, long start, long end) { + final Runnable runnable = new OneOffStreamingBuilder(RealizationType.CUBE, cubeName, start, end).build(); runnable.run(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java index 0ae7143..66a0af2 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java @@ -43,29 +43,27 @@ import org.apache.kylin.engine.streaming.StreamingManager; import org.apache.kylin.engine.streaming.cube.StreamingCubeBuilder; import com.google.common.base.Preconditions; +import org.apache.kylin.metadata.realization.RealizationType; /** * TODO: like MRUtil, use Factory pattern to allow config */ public class StreamingUtils { - public static IStreamingInput getStreamingInput(String streaming) { + public static IStreamingInput getStreamingInput() { return (IStreamingInput) ClassUtil.newInstance("org.apache.kylin.source.kafka.KafkaStreamingInput"); } - public static IStreamingOutput getStreamingOutput(String streaming) { + public static IStreamingOutput getStreamingOutput() { return (IStreamingOutput) ClassUtil.newInstance("org.apache.kylin.storage.hbase.steps.HBaseStreamingOutput"); } - public static StreamingBatchBuilder getMicroBatchBuilder(String streaming) { - final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(streaming); - Preconditions.checkNotNull(streamingConfig); - if (streamingConfig.getCubeName() != null) { - return new StreamingCubeBuilder(streamingConfig.getCubeName()); - } else if (streamingConfig.getIiName() != null) { - throw new UnsupportedOperationException("not implemented yet"); + public static StreamingBatchBuilder getMicroBatchBuilder(RealizationType realizationType, String realizationName) { + Preconditions.checkNotNull(realizationName); + if (realizationType == RealizationType.CUBE) { + return new StreamingCubeBuilder(realizationName); } else { - throw new UnsupportedOperationException("StreamingConfig is not valid"); + throw new UnsupportedOperationException("not implemented yet"); } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/examples/test_case_data/localmeta/kafka/default.streaming_table.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/kafka/default.streaming_table.json b/examples/test_case_data/localmeta/kafka/default.streaming_table.json new file mode 100644 index 0000000..c99b8e5 --- /dev/null +++ b/examples/test_case_data/localmeta/kafka/default.streaming_table.json @@ -0,0 +1,21 @@ +{ + "version":"2.1", + "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193", + "name": "default.streaming_table", + "topic": "test_streaming_table_topic_xyz", + "timeout": 60000, + "bufferSize": 65536, + "parserName": "org.apache.kylin.source.kafka.TimedJsonStreamParser", + "last_modified": 0, + "clusters": [ + { + "brokers": [ + { + "id": 0, + "host": "sandbox", + "port": 6667 + } + ] + } + ] +} http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/examples/test_case_data/localmeta/kafka/kafka_test.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/kafka/kafka_test.json b/examples/test_case_data/localmeta/kafka/kafka_test.json deleted file mode 100644 index a20f71e..0000000 --- a/examples/test_case_data/localmeta/kafka/kafka_test.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "version":"2.1", - "uuid": "8b2b9dfe-900c-4d39-bf89-8472ec6c3c0d", - "name": "kafka_test", - "topic": "kafka_stream_test", - "timeout": 60000, - "bufferSize": 65536, - "last_modified": 0, - "clusters": [ - { - "brokers": [ - { - "id": 0, - "host": "sandbox.hortonworks.com", - "port": 6667 - } - ] - } - ] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/examples/test_case_data/localmeta/kafka/test_streaming_table_cube.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/kafka/test_streaming_table_cube.json b/examples/test_case_data/localmeta/kafka/test_streaming_table_cube.json deleted file mode 100644 index 554fa62..0000000 --- a/examples/test_case_data/localmeta/kafka/test_streaming_table_cube.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "version":"2.1", - "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193", - "name": "test_streaming_table_cube", - "topic": "test_streaming_table_topic_xyz", - "timeout": 60000, - "bufferSize": 65536, - "parserName": "org.apache.kylin.source.kafka.TimedJsonStreamParser", - "partition": 1, - "last_modified": 0, - "clusters": [ - { - "brokers": [ - { - "id": 0, - "host": "sandbox", - "port": 6667 - } - ] - } - ] -} http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/examples/test_case_data/localmeta/kafka/test_streaming_table_ii.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/kafka/test_streaming_table_ii.json b/examples/test_case_data/localmeta/kafka/test_streaming_table_ii.json deleted file mode 100644 index b6f18c7..0000000 --- a/examples/test_case_data/localmeta/kafka/test_streaming_table_ii.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "version":"2.1", - "uuid": "8b2b9dfe-900c-4d39-bf89-8472ec909322", - "name": "test_streaming_table_ii", - "topic": "test_streaming_table_topic_xyz", - "timeout": 60000, - "bufferSize": 65536, - "parserName": "org.apache.kylin.source.kafka.JsonStreamParser", - "partition": 1, - "last_modified": 0, - "clusters": [ - { - "brokers": [ - { - "id": 0, - "host": "sandbox", - "port": 6667 - } - ] - } - ] -} http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/examples/test_case_data/localmeta/streaming/default.streaming_table.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/streaming/default.streaming_table.json b/examples/test_case_data/localmeta/streaming/default.streaming_table.json new file mode 100644 index 0000000..6eb4a88 --- /dev/null +++ b/examples/test_case_data/localmeta/streaming/default.streaming_table.json @@ -0,0 +1,6 @@ +{ + "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193", + "name": "default.streaming_table", + "type": "kafka", + "last_modified": 0 +} http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/examples/test_case_data/localmeta/streaming/kafka_test.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/streaming/kafka_test.json b/examples/test_case_data/localmeta/streaming/kafka_test.json deleted file mode 100644 index a20f71e..0000000 --- a/examples/test_case_data/localmeta/streaming/kafka_test.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "version":"2.1", - "uuid": "8b2b9dfe-900c-4d39-bf89-8472ec6c3c0d", - "name": "kafka_test", - "topic": "kafka_stream_test", - "timeout": 60000, - "bufferSize": 65536, - "last_modified": 0, - "clusters": [ - { - "brokers": [ - { - "id": 0, - "host": "sandbox.hortonworks.com", - "port": 6667 - } - ] - } - ] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json b/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json deleted file mode 100644 index ecf0511..0000000 --- a/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "version":"2.1", - "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193", - "name": "test_streaming_table_cube", - "cubeName": "test_streaming_table_cube", - "partition": 1, - "last_modified": 0 -} http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/examples/test_case_data/localmeta/streaming/test_streaming_table_ii.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/streaming/test_streaming_table_ii.json b/examples/test_case_data/localmeta/streaming/test_streaming_table_ii.json deleted file mode 100644 index 022ab70..0000000 --- a/examples/test_case_data/localmeta/streaming/test_streaming_table_ii.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "version":"2.1", - "uuid": "8b2b9dfe-900c-4d39-bf89-8472ec909322", - "name": "test_streaming_table_ii", - "iiName": "test_streaming_table_ii", - "partition": 1, - "last_modified": 0 -} http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index 27226e7..eeff999 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -27,10 +27,13 @@ import org.apache.kylin.common.util.AbstractKylinTestCase; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; import org.apache.kylin.engine.streaming.OneOffStreamingBuilder; import org.apache.kylin.engine.streaming.StreamingConfig; import org.apache.kylin.engine.streaming.StreamingManager; import org.apache.kylin.job.DeployUtil; +import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.source.kafka.config.KafkaConfig; import org.slf4j.Logger; @@ -42,7 +45,7 @@ import org.slf4j.LoggerFactory; public class BuildCubeWithStream { private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStream.class); - private static final String streamingName = "test_streaming_table_cube"; + private static final String cubeName = "test_streaming_table_cube"; private static final long startTime = DateFormat.stringToMillis("2015-01-01 00:00:00"); private static final long endTime = DateFormat.stringToMillis("2015-01-03 00:00:00"); private static final long batchInterval = 16 * 60 * 60 * 1000;//16 hours @@ -75,15 +78,16 @@ public class BuildCubeWithStream { DeployUtil.overrideJobJarLocations(); kylinConfig = KylinConfig.getInstanceFromEnv(); - - final StreamingConfig config = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(streamingName); + final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); + final String factTable = cubeInstance.getFactTable(); + final StreamingConfig config = StreamingManager.getInstance(kylinConfig).getStreamingConfig(factTable); //Use a random topic for kafka data stream - KafkaConfig streamingConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(streamingName); + KafkaConfig streamingConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(config.getName()); streamingConfig.setTopic(UUID.randomUUID().toString()); KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(streamingConfig); - DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, config.getCubeName(), streamingConfig); + DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, cubeName, streamingConfig); } public static void afterClass() throws Exception { @@ -94,7 +98,7 @@ public class BuildCubeWithStream { logger.info("start time:" + startTime + " end time:" + endTime + " batch interval:" + batchInterval + " batch count:" + ((endTime - startTime) / batchInterval)); for (long start = startTime; start < endTime; start += batchInterval) { logger.info(String.format("build batch:{%d, %d}", start, start + batchInterval)); - new OneOffStreamingBuilder(streamingName, start, start + batchInterval).build().run(); + new OneOffStreamingBuilder(RealizationType.CUBE, cubeName, start, start + batchInterval).build().run(); } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java index 9afa750..e60f330 100644 --- a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -284,92 +284,6 @@ public class CubeController extends BasicController { throw new InternalErrorException("Failed to clone cube ", e); } - boolean isStreamingCube = false, cloneStreamingConfigSuccess = false, cloneKafkaConfigSuccess = false; - - - List<StreamingConfig> streamingConfigs = null; - try { - streamingConfigs = streamingService.listAllStreamingConfigs(cubeName); - if (streamingConfigs.size() != 0) { - isStreamingCube = true; - } - - } catch (IOException e) { - e.printStackTrace(); - } - - StreamingConfig newStreamingConfig = null; - KafkaConfig newKafkaConfig = null; - - try { - - if (isStreamingCube) { - - isStreamingCube = true; - newStreamingConfig = streamingConfigs.get(0).clone(); - newStreamingConfig.setName(newCubeName + "_STREAMING"); - newStreamingConfig.updateRandomUuid(); - newStreamingConfig.setLastModified(0); - newStreamingConfig.setCubeName(newCubeName); - try { - streamingService.createStreamingConfig(newStreamingConfig); - cloneStreamingConfigSuccess = true; - } catch (IOException e) { - throw new InternalErrorException("Failed to clone streaming config. ", e); - } - - //StreamingConfig name and KafkaConfig name is the same for same cube - String kafkaConfigName = streamingConfigs.get(0).getName(); - KafkaConfig kafkaConfig = null; - try { - kafkaConfig = kafkaConfigService.getKafkaConfig(kafkaConfigName); - if (kafkaConfig != null) { - newKafkaConfig = kafkaConfig.clone(); - newKafkaConfig.setName(newStreamingConfig.getName()); - newKafkaConfig.setLastModified(0); - newKafkaConfig.updateRandomUuid(); - } - } catch (IOException e) { - throw new InternalErrorException("Failed to get kafka config info. ", e); - } - - try { - kafkaConfigService.createKafkaConfig(newKafkaConfig); - cloneKafkaConfigSuccess = true; - } catch (IOException e) { - throw new InternalErrorException("Failed to clone streaming config. ", e); - } - } - } finally { - - //rollback if failed - if (isStreamingCube) { - if (cloneStreamingConfigSuccess == false || cloneKafkaConfigSuccess == false) { - try { - cubeService.deleteCube(newCube); - } catch (Exception ex) { - throw new InternalErrorException("Failed, and failed to rollback on delete cube. " + " Caused by: " + ex.getMessage(), ex); - } - if (cloneStreamingConfigSuccess == true) { - try { - streamingService.dropStreamingConfig(newStreamingConfig); - } catch (IOException e) { - throw new InternalErrorException("Failed to clone cube, and StreamingConfig created and failed to delete: " + e.getLocalizedMessage()); - } - } - if (cloneKafkaConfigSuccess == true) { - try { - kafkaConfigService.dropKafkaConfig(newKafkaConfig); - } catch (IOException e) { - throw new InternalErrorException("Failed to clone cube, and KafkaConfig created and failed to delete: " + e.getLocalizedMessage()); - } - } - - } - - } - } - return newCube; } @@ -400,27 +314,6 @@ public class CubeController extends BasicController { throw new NotFoundException("Cube with name " + cubeName + " not found.."); } - //drop related StreamingConfig KafkaConfig if exist - try { - List<StreamingConfig> configs = streamingService.listAllStreamingConfigs(cubeName); - for (StreamingConfig config : configs) { - try { - streamingService.dropStreamingConfig(config); - } catch (IOException e) { - logger.error(e.getLocalizedMessage(), e); - throw new InternalErrorException("Failed to delete StreamingConfig. " + " Caused by: " + e.getMessage(), e); - } - try { - KafkaConfig kfkConfig = kafkaConfigService.getKafkaConfig(config.getName()); - kafkaConfigService.dropKafkaConfig(kfkConfig); - } catch (IOException e) { - throw new InternalErrorException("Failed to delete KafkaConfig. " + " Caused by: " + e.getMessage(), e); - } - } - } catch (IOException e) { - e.printStackTrace(); - } - //drop Cube try { cubeService.deleteCube(cube); @@ -587,133 +480,6 @@ public class CubeController extends BasicController { return cubeRequest; } - boolean updateStreamingConfigSuccess = false, updateKafkaConfigSuccess = false; - - boolean isStreamingCube = cubeRequest.getStreamingCube() != null && cubeRequest.getStreamingCube().equals("true"); - - //oldConfig is for recover use - StreamingConfig streamingConfig = null, oldStreamingConfig = null; - KafkaConfig kafkaConfig = null, oldKafkaConfig = null; - if (isStreamingCube) { - streamingConfig = deserializeStreamingDesc(cubeRequest); - kafkaConfig = deserializeKafkaDesc(cubeRequest); - try { - oldKafkaConfig = kafkaConfigService.getKafkaConfig(kafkaConfig.getName()); - } catch (IOException e) { - e.printStackTrace(); - } - oldStreamingConfig = streamingService.getStreamingManager().getStreamingConfig(streamingConfig.getName()); - } - try { - //streaming Cube - if (isStreamingCube) { - if (streamingConfig == null) { - cubeRequest.setMessage("No StreamingConfig info to update."); - return cubeRequest; - } - if (kafkaConfig == null) { - cubeRequest.setMessage("No KafkaConfig info to update."); - return cubeRequest; - } - - if (oldStreamingConfig == null) { - streamingConfig.setUuid(UUID.randomUUID().toString()); - try { - streamingService.createStreamingConfig(streamingConfig); - updateStreamingConfigSuccess = true; - } catch (IOException e) { - logger.error("Failed to add StreamingConfig:" + e.getLocalizedMessage(), e); - throw new InternalErrorException("Failed to add StreamingConfig: " + e.getLocalizedMessage()); - } - } else { - try { - streamingConfig = streamingService.updateStreamingConfig(streamingConfig); - updateStreamingConfigSuccess = true; - - } catch (IOException e) { - logger.error("Failed to update StreamingConfig:" + e.getLocalizedMessage(), e); - throw new InternalErrorException("Failed to update StreamingConfig: " + e.getLocalizedMessage()); - } - } - if (oldKafkaConfig == null) { - kafkaConfig.setUuid(UUID.randomUUID().toString()); - try { - kafkaConfigService.createKafkaConfig(kafkaConfig); - updateKafkaConfigSuccess = true; - } catch (IOException e) { - logger.error("Failed to add KafkaConfig:" + e.getLocalizedMessage(), e); - throw new InternalErrorException("Failed to add KafkaConfig: " + e.getLocalizedMessage()); - } - - } else { - try { - kafkaConfig = kafkaConfigService.updateKafkaConfig(kafkaConfig); - updateKafkaConfigSuccess = true; - } catch (IOException e) { - logger.error("Failed to update KafkaConfig:" + e.getLocalizedMessage(), e); - throw new InternalErrorException("Failed to update KafkaConfig: " + e.getLocalizedMessage()); - } - } - - } - } finally { - if (isStreamingCube) { - //recover cube desc - if (updateStreamingConfigSuccess == false || updateKafkaConfigSuccess == false) { - oldCubeDesc.setLastModified(desc.getLastModified()); - CubeInstance cube = cubeService.getCubeManager().getCube(cubeRequest.getCubeName()); - try { - desc = cubeService.updateCubeAndDesc(cube, oldCubeDesc, projectName); - } catch (Exception e) { - logger.error("Failed to recover CubeDesc:" + e.getLocalizedMessage(), e); - throw new InternalErrorException("Failed to recover CubeDesc: " + e.getLocalizedMessage()); - } - - if (updateStreamingConfigSuccess == true) { - - if (oldStreamingConfig != null) { - - oldStreamingConfig.setLastModified(streamingConfig.getLastModified()); - try { - streamingService.updateStreamingConfig(oldStreamingConfig); - } catch (IOException e) { - logger.error("Failed to recover StreamingConfig:" + e.getLocalizedMessage(), e); - throw new InternalErrorException("Failed to recover StreamingConfig: " + e.getLocalizedMessage()); - } - } else { - try { - streamingService.dropStreamingConfig(streamingConfig); - } catch (IOException e) { - logger.error("Failed to remove added StreamingConfig:" + e.getLocalizedMessage(), e); - throw new InternalErrorException("Failed to remove added StreamingConfig: " + e.getLocalizedMessage()); - } - } - } - - if (updateKafkaConfigSuccess == true) { - if (oldKafkaConfig != null) { - oldKafkaConfig.setLastModified(kafkaConfig.getLastModified()); - try { - kafkaConfigService.updateKafkaConfig(oldKafkaConfig); - } catch (IOException e) { - logger.error("Failed to recover KafkaConfig:" + e.getLocalizedMessage(), e); - throw new InternalErrorException("Failed to recover KafkaConfig: " + e.getLocalizedMessage()); - } - } else { - try { - kafkaConfigService.dropKafkaConfig(kafkaConfig); - } catch (IOException e) { - logger.error("Failed to remove added KafkaConfig:" + e.getLocalizedMessage(), e); - throw new InternalErrorException("Failed to remove added KafkaConfig: " + e.getLocalizedMessage()); - } - } - } - - } - } - - } - String descData = JsonUtil.writeValueAsIndentString(desc); cubeRequest.setCubeDescData(descData); cubeRequest.setSuccessful(true); http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java index e22bd30..ecd7571 100644 --- a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java +++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java @@ -60,9 +60,9 @@ public class StreamingController extends BasicController { @RequestMapping(value = "/getConfig", method = { RequestMethod.GET }) @ResponseBody - public List<StreamingConfig> getStreamings(@RequestParam(value = "cubeName", required = false) String cubeName, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) { + public List<StreamingConfig> getStreamings(@RequestParam(value = "table", required = false) String table, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) { try { - return streamingService.getStreamingConfigs(cubeName, limit, offset); + return streamingService.getStreamingConfigs(table, limit, offset); } catch (IOException e) { logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e); throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage()); http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java index e40426b..a0473e9 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java @@ -18,6 +18,8 @@ package org.apache.kylin.rest.service; +import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.engine.streaming.StreamingConfig; import org.apache.kylin.rest.constant.Constant; @@ -37,26 +39,22 @@ public class StreamingService extends BasicService { private AccessService accessService; @PostFilter(Constant.ACCESS_POST_FILTER_READ) - public List<StreamingConfig> listAllStreamingConfigs(final String cubeName) throws IOException { + public List<StreamingConfig> listAllStreamingConfigs(final String table) throws IOException { List<StreamingConfig> streamingConfigs = new ArrayList(); - CubeInstance cubeInstance = (null != cubeName) ? getCubeManager().getCube(cubeName) : null; - if (null == cubeInstance) { + if (StringUtils.isEmpty(table)) { streamingConfigs = getStreamingManager().listAllStreaming(); } else { - for(StreamingConfig config : getStreamingManager().listAllStreaming()){ - if(cubeInstance.getName().equals(config.getCubeName())){ - streamingConfigs.add(config); - } - } + StreamingConfig config = getStreamingManager().getConfig(table); + streamingConfigs.add(config); } return streamingConfigs; } - public List<StreamingConfig> getStreamingConfigs(final String cubeName, final Integer limit, final Integer offset) throws IOException { + public List<StreamingConfig> getStreamingConfigs(final String table, final Integer limit, final Integer offset) throws IOException { List<StreamingConfig> streamingConfigs; - streamingConfigs = listAllStreamingConfigs(cubeName); + streamingConfigs = listAllStreamingConfigs(table); if (limit == null || offset == null) { return streamingConfigs; http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java index 2e262b3..c05119f 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java @@ -47,9 +47,14 @@ import kafka.message.MessageAndOffset; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; import org.apache.kylin.engine.streaming.IStreamingInput; import org.apache.kylin.common.util.StreamingBatch; import org.apache.kylin.common.util.StreamingMessage; +import org.apache.kylin.engine.streaming.StreamingConfig; +import org.apache.kylin.engine.streaming.StreamingManager; +import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.source.kafka.config.KafkaClusterConfig; import org.apache.kylin.source.kafka.config.KafkaConfig; import org.apache.kylin.source.kafka.util.KafkaRequester; @@ -65,39 +70,54 @@ public class KafkaStreamingInput implements IStreamingInput { private static final Logger logger = LoggerFactory.getLogger(KafkaStreamingInput.class); @Override - public StreamingBatch getBatchWithTimeWindow(String streaming, int id, long startTime, long endTime) { - try { + public StreamingBatch getBatchWithTimeWindow(RealizationType realizationType, String realizationName, int id, long startTime, long endTime) { + if (realizationType != RealizationType.CUBE) { + throw new IllegalArgumentException("Unsupported realization in KafkaStreamingInput: " + realizationType); + } + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(realizationName); + final String streaming = cube.getFactTable(); + final StreamingManager streamingManager = StreamingManager.getInstance(kylinConfig); + final StreamingConfig streamingConfig = streamingManager.getConfig(streaming); + if (streamingConfig == null) { + throw new IllegalArgumentException("Table " + streaming + " is not a streaming table."); + } + if (StreamingConfig.STREAMING_TYPE_KAFKA.equals(streamingConfig.getType())) { logger.info(String.format("prepare to get streaming batch, name:%s, id:%d, startTime:%d, endTime:%d", streaming, id, startTime, endTime)); - final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig); - final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(streaming); - final StreamingParser streamingParser = StreamingParser.getStreamingParser(kafkaConfig); - final ExecutorService executorService = Executors.newCachedThreadPool(); - final List<Future<List<StreamingMessage>>> futures = Lists.newArrayList(); - for (final KafkaClusterConfig kafkaClusterConfig : kafkaConfig.getKafkaClusterConfigs()) { - final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size(); - for (int i = 0; i < partitionCount; ++i) { - final StreamingMessageProducer producer = new StreamingMessageProducer(kafkaClusterConfig, i, Pair.newPair(startTime, endTime), kafkaConfig.getMargin(), streamingParser); - final Future<List<StreamingMessage>> future = executorService.submit(producer); - futures.add(future); + + try { + final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig); + final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(streaming); + final StreamingParser streamingParser = StreamingParser.getStreamingParser(kafkaConfig, realizationType, realizationName); + final ExecutorService executorService = Executors.newCachedThreadPool(); + final List<Future<List<StreamingMessage>>> futures = Lists.newArrayList(); + for (final KafkaClusterConfig kafkaClusterConfig : kafkaConfig.getKafkaClusterConfigs()) { + final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size(); + for (int i = 0; i < partitionCount; ++i) { + final StreamingMessageProducer producer = new StreamingMessageProducer(kafkaClusterConfig, i, Pair.newPair(startTime, endTime), kafkaConfig.getMargin(), streamingParser); + final Future<List<StreamingMessage>> future = executorService.submit(producer); + futures.add(future); + } } - } - List<StreamingMessage> messages = Lists.newLinkedList(); - for (Future<List<StreamingMessage>> future : futures) { - try { - messages.addAll(future.get()); - } catch (InterruptedException e) { - logger.warn("this thread should not be interrupted, just ignore", e); - continue; - } catch (ExecutionException e) { - throw new RuntimeException("error when get StreamingMessages",e.getCause()); + List<StreamingMessage> messages = Lists.newLinkedList(); + for (Future<List<StreamingMessage>> future : futures) { + try { + messages.addAll(future.get()); + } catch (InterruptedException e) { + logger.warn("this thread should not be interrupted, just ignore", e); + continue; + } catch (ExecutionException e) { + throw new RuntimeException("error when get StreamingMessages", e.getCause()); + } } + final Pair<Long, Long> timeRange = Pair.newPair(startTime, endTime); + logger.info("finish to get streaming batch, total message count:" + messages.size()); + return new StreamingBatch(messages, timeRange); + } catch (ReflectiveOperationException e) { + throw new RuntimeException("failed to create instance of StreamingParser", e); } - final Pair<Long, Long> timeRange = Pair.newPair(startTime, endTime); - logger.info("finish to get streaming batch, total message count:" + messages.size()); - return new StreamingBatch(messages, timeRange); - } catch (ReflectiveOperationException e) { - throw new RuntimeException("failed to create instance of StreamingParser", e); + } else { + throw new IllegalArgumentException("kafka is the only supported streaming type."); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java index 3455f1d..7b326e2 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java @@ -50,6 +50,7 @@ import org.apache.kylin.engine.streaming.StreamingManager; import org.apache.kylin.common.util.StreamingMessage; import org.apache.kylin.metadata.model.IntermediateColumnDesc; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.source.kafka.config.KafkaConfig; import com.google.common.base.Function; @@ -68,9 +69,8 @@ public abstract class StreamingParser { abstract public boolean filter(StreamingMessage streamingMessage); - public static StreamingParser getStreamingParser(KafkaConfig kafkaConfig) throws ReflectiveOperationException { - final String cubeName = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(kafkaConfig.getName()).getCubeName(); - final CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); + public static StreamingParser getStreamingParser(KafkaConfig kafkaConfig, RealizationType realizationType, String realizationName) throws ReflectiveOperationException { + final CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(realizationName); List<TblColRef> columns = Lists.transform(new CubeJoinedFlatTableDesc(cubeInstance.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() { @Nullable @Override