refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/f5c55d7b Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/f5c55d7b Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/f5c55d7b Branch: refs/heads/KYLIN-1011 Commit: f5c55d7b431486b785ccafb4de904d43809480b9 Parents: 55a85df Author: qianhao.zhou <[email protected]> Authored: Tue Sep 22 16:48:39 2015 +0800 Committer: qianhao.zhou <[email protected]> Committed: Tue Sep 22 17:29:32 2015 +0800 ---------------------------------------------------------------------- assembly/pom.xml | 5 - .../kylin/job/BuildCubeWithStreamTest.java | 35 +++-- .../apache/kylin/job/BuildIIWithStreamTest.java | 43 +++--- .../java/org/apache/kylin/job/DeployUtil.java | 32 ++--- .../job/ITKafkaBasedIIStreamBuilderTest.java | 85 ----------- .../kylin/job/hadoop/invertedindex/IITest.java | 39 ++--- .../job/streaming/CubeStreamConsumerTest.java | 90 ------------ .../kylin/job/streaming/KafkaDataLoader.java | 11 +- .../streaming/PeriodicalStreamBuilderTest.java | 144 ------------------- .../invertedindex/streaming/SliceBuilder.java | 81 +++++++++++ .../source/kafka/StringStreamingParser.java | 65 +++++++++ 11 files changed, 220 insertions(+), 410 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/pom.xml ---------------------------------------------------------------------- diff --git a/assembly/pom.xml b/assembly/pom.xml index 99557fb..9f17913 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -49,11 +49,6 @@ <artifactId>kylin-invertedindex</artifactId> <version>${project.parent.version}</version> </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-streaming</artifactId> - <version>${project.parent.version}</version> - </dependency> <!-- Env & Test --> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java index b02b2f2..6bfd560 100644 --- a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java +++ b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java @@ -34,21 +34,18 @@ package org.apache.kylin.job; -import java.io.File; -import java.io.IOException; -import java.util.UUID; - import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.AbstractKylinTestCase; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.DateFormat; -import org.apache.kylin.job.streaming.BootstrapConfig; -import org.apache.kylin.job.streaming.StreamingBootstrap; +import org.apache.kylin.engine.streaming.OneOffStreamingBuilder; +import org.apache.kylin.engine.streaming.StreamingConfig; +import org.apache.kylin.engine.streaming.StreamingManager; +import org.apache.kylin.source.kafka.KafkaConfigManager; +import org.apache.kylin.source.kafka.config.KafkaConfig; import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase; import org.apache.kylin.storage.hbase.util.StorageCleanupJob; -import org.apache.kylin.streaming.StreamingConfig; -import org.apache.kylin.streaming.StreamingManager; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -56,6 +53,10 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.util.UUID; + /** * for streaming cubing case "test_streaming_table" */ @@ -84,12 +85,14 @@ public class BuildCubeWithStreamTest { kylinConfig = KylinConfig.getInstanceFromEnv(); - //Use a random toplic for kafka data stream - StreamingConfig streamingConfig = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingName); + final StreamingConfig config = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(streamingName); + + //Use a random topic for kafka data stream + KafkaConfig streamingConfig = KafkaConfigManager.getInstance(kylinConfig).getStreamingConfig(streamingName); streamingConfig.setTopic(UUID.randomUUID().toString()); - StreamingManager.getInstance(kylinConfig).saveStreamingConfig(streamingConfig); + KafkaConfigManager.getInstance(kylinConfig).saveStreamingConfig(streamingConfig); - DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, streamingConfig); + DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, config.getCubeName(), streamingConfig); } @AfterClass @@ -120,13 +123,7 @@ public class BuildCubeWithStreamTest { @Test public void test() throws Exception { for (long start = startTime; start < endTime; start += batchInterval) { - BootstrapConfig bootstrapConfig = new BootstrapConfig(); - bootstrapConfig.setStart(start); - bootstrapConfig.setEnd(start + batchInterval); - bootstrapConfig.setOneOff(true); - bootstrapConfig.setPartitionId(0); - bootstrapConfig.setStreaming(streamingName); - StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).start(bootstrapConfig); + new OneOffStreamingBuilder(streamingName, start, start + batchInterval).build().run(); } } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java index 5ca3b29..89be628 100644 --- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java +++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java @@ -39,11 +39,7 @@ import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; import java.text.SimpleDateFormat; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.TimeZone; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -55,12 +51,16 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.AbstractKylinTestCase; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.DateFormat; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.engine.mr.JobBuilderSupport; +import org.apache.kylin.engine.streaming.StreamingBatch; +import org.apache.kylin.engine.streaming.StreamingMessage; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; import org.apache.kylin.invertedindex.IISegment; import org.apache.kylin.invertedindex.model.IIDesc; import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc; +import org.apache.kylin.invertedindex.streaming.SliceBuilder; import org.apache.kylin.job.common.ShellExecutable; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.engine.JobEngineConfig; @@ -69,9 +69,6 @@ import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.source.hive.HiveTableReader; import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase; -import org.apache.kylin.streaming.StreamBuilder; -import org.apache.kylin.streaming.StreamMessage; -import org.apache.kylin.streaming.invertedindex.IIStreamConsumer; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -184,32 +181,30 @@ public class BuildIIWithStreamTest { logger.info("measure:" + tblColRef.getName()); } } - LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<StreamMessage>(); final IISegment segment = createSegment(iiName); String[] args = new String[] { "-iiname", iiName, "-htablename", segment.getStorageLocationIdentifier() }; ToolRunner.run(new IICreateHTableJob(), args); - ExecutorService executorService = Executors.newSingleThreadExecutor(); - final StreamBuilder streamBuilder = StreamBuilder.newLimitedSizeStreamBuilder(iiName, queue, new IIStreamConsumer(iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(), 0), 0, segment.getIIDesc().getSliceSize()); + final IIDesc iiDesc = segment.getIIDesc(); + final SliceBuilder sliceBuilder = new SliceBuilder(desc, (short) 0, iiDesc.isUseLocalDictionary()); List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn()); int count = sorted.size(); + ArrayList<StreamingMessage> messages = Lists.newArrayList(); for (String[] row : sorted) { - logger.info("another row: " + StringUtils.join(row, ",")); - queue.put(parse(row)); + if (messages.size() >= iiDesc.getSliceSize()) { + messages.add(parse(row)); + } else { + sliceBuilder.buildSlice(new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis()))); + messages = Lists.newArrayList(); + } + } + if (!messages.isEmpty()) { + sliceBuilder.buildSlice(new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis()))); } reader.close(); logger.info("total record count:" + count + " htable:" + segment.getStorageLocationIdentifier()); - queue.put(StreamMessage.EOF); - final Future<?> future = executorService.submit(streamBuilder); - try { - future.get(); - } catch (Exception e) { - logger.error("stream build failed", e); - fail("stream build failed"); - } - logger.info("stream build finished, htable name:" + segment.getStorageLocationIdentifier()); } @@ -225,8 +220,8 @@ public class BuildIIWithStreamTest { } } - private StreamMessage parse(String[] row) { - return new StreamMessage(System.currentTimeMillis(), StringUtils.join(row, ",").getBytes()); + private StreamingMessage parse(String[] row) { + return new StreamingMessage(Lists.newArrayList(row), System.currentTimeMillis(), System.currentTimeMillis(), Collections.<String, Object>emptyMap()); } private List<String[]> getSortedRows(HiveTableReader reader, final int tsCol) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java index d47a664..9ec4c88 100644 --- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java +++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java @@ -18,14 +18,9 @@ package org.apache.kylin.job; -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.FileReader; -import java.io.IOException; -import java.io.InputStream; -import java.util.List; - +import com.google.common.collect.Lists; +import kafka.message.Message; +import kafka.message.MessageAndOffset; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; @@ -35,6 +30,7 @@ import org.apache.kylin.common.util.AbstractKylinTestCase; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.engine.streaming.StreamingConfig; import org.apache.kylin.job.dataGen.FactTableGenerator; import org.apache.kylin.job.streaming.KafkaDataLoader; import org.apache.kylin.job.streaming.StreamingTableDataGenerator; @@ -43,15 +39,16 @@ import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.hive.HiveClient; -import org.apache.kylin.streaming.StreamMessage; -import org.apache.kylin.streaming.StreamingConfig; -import org.apache.kylin.streaming.TimedJsonStreamParser; +import org.apache.kylin.source.kafka.KafkaConfigManager; +import org.apache.kylin.source.kafka.TimedJsonStreamParser; +import org.apache.kylin.source.kafka.config.KafkaConfig; import org.apache.maven.model.Model; import org.apache.maven.model.io.xpp3.MavenXpp3Reader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; +import java.io.*; +import java.util.List; public class DeployUtil { private static final Logger logger = LoggerFactory.getLogger(DeployUtil.class); @@ -146,14 +143,13 @@ public class DeployUtil { deployHiveTables(); } - public static void prepareTestDataForStreamingCube(long startTime, long endTime, StreamingConfig streamingConfig) throws IOException { - CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(streamingConfig.getCubeName()); + public static void prepareTestDataForStreamingCube(long startTime, long endTime, String cubeName, KafkaConfig kafkaConfig) throws IOException { + CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); List<String> data = StreamingTableDataGenerator.generate(10000, startTime, endTime, cubeInstance.getFactTable()); TableDesc tableDesc = cubeInstance.getFactTableDesc(); - //load into kafka - KafkaDataLoader.loadIntoKafka(streamingConfig, data); - logger.info("Write {} messages into topic {}", data.size(), streamingConfig.getTopic()); + KafkaDataLoader.loadIntoKafka(kafkaConfig.getKafkaClusterConfigs(), data); + logger.info("Write {} messages into topic {}", data.size(), kafkaConfig.getTopic()); //csv data for H2 use List<TblColRef> tableColumns = Lists.newArrayList(); @@ -163,7 +159,7 @@ public class DeployUtil { TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, "formatTs=true"); StringBuilder sb = new StringBuilder(); for (String json : data) { - List<String> rowColumns = timedJsonStreamParser.parse(new StreamMessage(0, json.getBytes())).getStreamMessage(); + List<String> rowColumns = timedJsonStreamParser.parse(new MessageAndOffset(new Message(json.getBytes()), 0)).getData(); sb.append(StringUtils.join(rowColumns, ",")); sb.append(System.getProperty("line.separator")); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java b/assembly/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java deleted file mode 100644 index 6a615cb..0000000 --- a/assembly/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * - * contributor license agreements. See the NOTICE file distributed with - * - * this work for additional information regarding copyright ownership. - * - * The ASF licenses this file to You under the Apache License, Version 2.0 - * - * (the "License"); you may not use this file except in compliance with - * - * the License. You may obtain a copy of the License at - * - * - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * - * - * Unless required by applicable law or agreed to in writing, software - * - * distributed under the License is distributed on an "AS IS" BASIS, - * - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * - * See the License for the specific language governing permissions and - * - * limitations under the License. - * - * / - */ - -package org.apache.kylin.job; - -import java.io.File; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.AbstractKylinTestCase; -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.job.streaming.StreamingBootstrap; -import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - */ -@Ignore("this test case will break existing metadata store") -public class ITKafkaBasedIIStreamBuilderTest { - - private static final Logger logger = LoggerFactory.getLogger(ITKafkaBasedIIStreamBuilderTest.class); - - private KylinConfig kylinConfig; - - @BeforeClass - public static void beforeClass() throws Exception { - ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this - } - - @Before - public void before() throws Exception { - HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); - - kylinConfig = KylinConfig.getInstanceFromEnv(); - DeployUtil.initCliWorkDir(); - DeployUtil.deployMetadata(); - DeployUtil.overrideJobJarLocations(); - } - - @Test - public void test() throws Exception { - final StreamingBootstrap bootstrap = StreamingBootstrap.getInstance(kylinConfig); - bootstrap.start("eagle", 0); - Thread.sleep(30 * 60 * 1000); - logger.info("time is up, stop streaming"); - bootstrap.stop(); - Thread.sleep(5 * 1000); - } -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java index dcd460b..913a2f7 100644 --- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java +++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java @@ -10,12 +10,18 @@ import java.util.Set; import javax.annotation.Nullable; +import com.google.common.base.Function; +import kafka.message.Message; +import kafka.message.MessageAndOffset; import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.kylin.common.util.FIFOIterable; import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.engine.streaming.StreamingBatch; +import org.apache.kylin.engine.streaming.StreamingMessage; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; import org.apache.kylin.invertedindex.index.Slice; @@ -26,6 +32,7 @@ import org.apache.kylin.invertedindex.model.IIKeyValueCodec; import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState; import org.apache.kylin.invertedindex.model.IIRow; import org.apache.kylin.invertedindex.model.KeyValueCodec; +import org.apache.kylin.invertedindex.streaming.SliceBuilder; import org.apache.kylin.metadata.filter.ColumnTupleFilter; import org.apache.kylin.metadata.filter.CompareTupleFilter; import org.apache.kylin.metadata.filter.ConstantTupleFilter; @@ -33,6 +40,8 @@ import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.ParameterDesc; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.source.kafka.StreamingParser; +import org.apache.kylin.source.kafka.StringStreamingParser; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType; @@ -41,18 +50,11 @@ import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.ClearTextDictionar import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.EndpointAggregators; import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint; import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos; -import org.apache.kylin.streaming.MicroStreamBatch; -import org.apache.kylin.streaming.ParsedStreamMessage; -import org.apache.kylin.streaming.StreamMessage; -import org.apache.kylin.streaming.StreamParser; -import org.apache.kylin.streaming.StringStreamParser; -import org.apache.kylin.streaming.invertedindex.SliceBuilder; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import com.google.common.base.Function; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -78,24 +80,23 @@ public class IITest extends LocalFileMetadataTestCase { this.ii = IIManager.getInstance(getTestConfig()).getII(iiName); this.iiDesc = ii.getDescriptor(); - List<StreamMessage> streamMessages = Lists.transform(Arrays.asList(inputData), new Function<String, StreamMessage>() { + List<MessageAndOffset> messages = Lists.transform(Arrays.asList(inputData), new Function<String, MessageAndOffset>() { @Nullable @Override - public StreamMessage apply(String input) { - return new StreamMessage(System.currentTimeMillis(), input.getBytes()); + public MessageAndOffset apply(String input) { + return new MessageAndOffset(new Message(input.getBytes()), System.currentTimeMillis()); } }); - List<List<String>> parsedStreamMessages = Lists.newArrayList(); - StreamParser parser = StringStreamParser.instance; - - MicroStreamBatch batch = new MicroStreamBatch(0); - for (StreamMessage message : streamMessages) { - ParsedStreamMessage parsedStreamMessage = parser.parse(message); - if ((parsedStreamMessage.isAccepted())) { - batch.add(parsedStreamMessage); + final StreamingParser parser = StringStreamingParser.instance; + final List<StreamingMessage> streamingMessages = Lists.transform(messages, new Function<MessageAndOffset, StreamingMessage>() { + @Nullable + @Override + public StreamingMessage apply(@Nullable MessageAndOffset input) { + return parser.parse(input); } - } + }); + StreamingBatch batch = new StreamingBatch(streamingMessages, Pair.newPair(0L, System.currentTimeMillis())); iiRows = Lists.newArrayList(); final Slice slice = new SliceBuilder(iiDesc, (short) 0, true).buildSlice((batch)); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java b/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java deleted file mode 100644 index be4fa26..0000000 --- a/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java +++ /dev/null @@ -1,90 +0,0 @@ -package org.apache.kylin.job.streaming; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingDeque; - -import org.apache.hadoop.util.StringUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.AbstractKylinTestCase; -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; -import org.apache.kylin.job.DeployUtil; -import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase; -import org.apache.kylin.streaming.StreamBuilder; -import org.apache.kylin.streaming.StreamMessage; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -/** - */ -@Ignore -public class CubeStreamConsumerTest { - - private static final Logger logger = LoggerFactory.getLogger(CubeStreamConsumerTest.class); - - private KylinConfig kylinConfig; - - private static final String CUBE_NAME = "test_kylin_cube_without_slr_left_join_ready"; - - @BeforeClass - public static void beforeClass() throws Exception { - ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this - } - - @Before - public void before() throws Exception { - HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); - - kylinConfig = KylinConfig.getInstanceFromEnv(); - DeployUtil.initCliWorkDir(); - DeployUtil.deployMetadata(); - DeployUtil.overrideJobJarLocations(); - final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME); - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()])); - // remove all existing segments - CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder); - - } - - @Test - public void test() throws Exception { - LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>(); - List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList(); - queues.add(queue); - StreamBuilder cubeStreamBuilder = StreamBuilder.newPeriodicalStreamBuilder(CUBE_NAME, queues, new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis(), 30L * 1000); - final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder); - loadDataFromLocalFile(queue, 100000); - future.get(); - } - - private void loadDataFromLocalFile(BlockingQueue<StreamMessage> queue, final int maxCount) throws IOException, InterruptedException { - BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt"))); - String line; - int count = 0; - while ((line = br.readLine()) != null && count++ < maxCount) { - final List<String> strings = Arrays.asList(line.split("\t")); - queue.put(new StreamMessage(System.currentTimeMillis(), StringUtils.join(",", strings).getBytes())); - } - queue.put(StreamMessage.EOF); - } -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java index 95fbc9d..c3caa9b 100644 --- a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java +++ b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java @@ -10,22 +10,21 @@ import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.apache.commons.lang.StringUtils; -import org.apache.kylin.streaming.BrokerConfig; -import org.apache.kylin.streaming.KafkaClusterConfig; -import org.apache.kylin.streaming.StreamingConfig; import com.google.common.base.Function; import com.google.common.collect.Collections2; import com.google.common.collect.Lists; +import org.apache.kylin.source.kafka.config.BrokerConfig; +import org.apache.kylin.source.kafka.config.KafkaClusterConfig; /** * Load prepared data into kafka(for test use) */ public class KafkaDataLoader { - public static void loadIntoKafka(StreamingConfig streamingConfig, List<String> messages) { + public static void loadIntoKafka(List<KafkaClusterConfig> kafkaClusterConfigs, List<String> messages) { - KafkaClusterConfig clusterConfig = streamingConfig.getKafkaClusterConfigs().get(0); + KafkaClusterConfig clusterConfig = kafkaClusterConfigs.get(0); String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() { @Nullable @Override @@ -44,7 +43,7 @@ public class KafkaDataLoader { List<KeyedMessage<String, String>> keyedMessages = Lists.newArrayList(); for (int i = 0; i < messages.size(); ++i) { - KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), messages.get(i)); + KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i)); keyedMessages.add(keyedMessage); } producer.send(keyedMessages); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java b/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java deleted file mode 100644 index dc6d312..0000000 --- a/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java +++ /dev/null @@ -1,144 +0,0 @@ -package org.apache.kylin.job.streaming; - -import static org.junit.Assert.assertEquals; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.LocalFileMetadataTestCase; -import org.apache.kylin.common.util.TimeUtil; -import org.apache.kylin.streaming.MicroStreamBatch; -import org.apache.kylin.streaming.MicroStreamBatchConsumer; -import org.apache.kylin.streaming.ParsedStreamMessage; -import org.apache.kylin.streaming.StreamBuilder; -import org.apache.kylin.streaming.StreamMessage; -import org.apache.kylin.streaming.StreamParser; -import org.apache.kylin.streaming.StreamingManager; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -/** - */ -public class PeriodicalStreamBuilderTest extends LocalFileMetadataTestCase { - - private static final Logger logger = LoggerFactory.getLogger(PeriodicalStreamBuilderTest.class); - - @Before - public void setup() { - this.createTestMetadata(); - - } - - @After - public void clear() { - this.cleanupTestMetadata(); - } - - private List<StreamMessage> prepareTestData(long start, long end, int count) { - double step = (double) (end - start) / (count - 1); - long ts = start; - int offset = 0; - ArrayList<StreamMessage> result = Lists.newArrayList(); - for (int i = 0; i < count - 1; ++i) { - result.add(new StreamMessage(offset++, String.valueOf(ts).getBytes())); - ts += step; - } - result.add(new StreamMessage(offset++, String.valueOf(end).getBytes())); - assertEquals(count, result.size()); - assertEquals(start + "", new String(result.get(0).getRawData())); - assertEquals(end + "", new String(result.get(count - 1).getRawData())); - return result; - } - - @Test - public void test() throws ExecutionException, InterruptedException { - - List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList(); - queues.add(new LinkedBlockingQueue<StreamMessage>()); - queues.add(new LinkedBlockingQueue<StreamMessage>()); - - final long interval = 3000L; - final long nextPeriodStart = TimeUtil.getNextPeriodStart(System.currentTimeMillis(), interval); - - final List<Integer> partitionIds = Lists.newArrayList(); - for (int i = 0; i < queues.size(); i++) { - partitionIds.add(i); - } - - final MicroStreamBatchConsumer consumer = new MicroStreamBatchConsumer() { - @Override - public void consume(MicroStreamBatch microStreamBatch) throws Exception { - logger.info("consuming batch:" + microStreamBatch.getPartitionId() + " count:" + microStreamBatch.size() + " timestamp:" + microStreamBatch.getTimestamp() + " offset:" + microStreamBatch.getOffset()); - } - - @Override - public void stop() { - logger.info("consumer stopped"); - } - }; - final StreamBuilder streamBuilder = StreamBuilder.newPeriodicalStreamBuilder("test", queues, consumer, nextPeriodStart, interval); - - streamBuilder.setStreamParser(new StreamParser() { - @Override - public ParsedStreamMessage parse(StreamMessage streamMessage) { - return new ParsedStreamMessage(Collections.<String> emptyList(), streamMessage.getOffset(), Long.parseLong(new String(streamMessage.getRawData())), true); - } - }); - - Future<?> future = Executors.newSingleThreadExecutor().submit(streamBuilder); - long timeout = nextPeriodStart + interval; - int messageCount = 0; - int inPeriodMessageCount = 0; - int expectedOffset = 0; - logger.info("prepare to add StreamMessage"); - while (true) { - long ts = System.currentTimeMillis(); - if (ts >= timeout + interval) { - break; - } - if (ts >= nextPeriodStart && ts < timeout) { - inPeriodMessageCount++; - } - for (BlockingQueue<StreamMessage> queue : queues) { - queue.put(new StreamMessage(messageCount, String.valueOf(ts).getBytes())); - } - if (expectedOffset == 0 && ts >= timeout) { - expectedOffset = messageCount - 1; - } - messageCount++; - Thread.sleep(10); - } - logger.info("totally put " + messageCount + " StreamMessages"); - logger.info("totally in period " + inPeriodMessageCount + " StreamMessages"); - - for (BlockingQueue<StreamMessage> queue : queues) { - queue.put(StreamMessage.EOF); - } - - future.get(); - - for (BlockingQueue<StreamMessage> queue : queues) { - queue.take(); - } - - final Map<Integer, Long> offsets = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getOffset("test", partitionIds); - logger.info("offset:" + offsets); - for (Long offset : offsets.values()) { - assertEquals(expectedOffset, offset.longValue()); - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java new file mode 100644 index 0000000..ba337c8 --- /dev/null +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.invertedindex.streaming; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.engine.streaming.StreamingBatch; +import org.apache.kylin.engine.streaming.StreamingMessage; +import org.apache.kylin.invertedindex.index.BatchSliceMaker; +import org.apache.kylin.invertedindex.index.Slice; +import org.apache.kylin.invertedindex.index.TableRecord; +import org.apache.kylin.invertedindex.index.TableRecordInfo; +import org.apache.kylin.invertedindex.model.IIDesc; +import org.apache.kylin.invertedindex.util.IIDictionaryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.util.List; + +/** + */ +public final class SliceBuilder { + + private static Logger logger = LoggerFactory.getLogger(SliceBuilder.class); + + private final BatchSliceMaker sliceMaker; + private final IIDesc iiDesc; + private final boolean useLocalDict; + + public SliceBuilder(IIDesc desc, short shard, boolean useLocalDict) { + this.iiDesc = desc; + this.sliceMaker = new BatchSliceMaker(desc, shard); + this.useLocalDict = useLocalDict; + } + + public Slice buildSlice(StreamingBatch microStreamBatch) { + final List<List<String>> messages = Lists.transform(microStreamBatch.getMessages(), new Function<StreamingMessage, List<String>>() { + @Nullable + @Override + public List<String> apply(@Nullable StreamingMessage input) { + return input.getData(); + } + }); + final Dictionary<?>[] dictionaries = useLocalDict ? IIDictionaryBuilder.buildDictionary(messages, iiDesc) : new Dictionary[iiDesc.listAllColumns().size()]; + TableRecordInfo tableRecordInfo = new TableRecordInfo(iiDesc, dictionaries); + return build(messages, tableRecordInfo, dictionaries); + } + + private Slice build(List<List<String>> table, final TableRecordInfo tableRecordInfo, Dictionary<?>[] localDictionary) { + final Slice slice = sliceMaker.makeSlice(tableRecordInfo.getDigest(), Lists.transform(table, new Function<List<String>, TableRecord>() { + @Nullable + @Override + public TableRecord apply(@Nullable List<String> input) { + TableRecord result = tableRecordInfo.createTableRecord(); + for (int i = 0; i < input.size(); i++) { + result.setValueString(i, input.get(i)); + } + return result; + } + })); + slice.setLocalDictionaries(localDictionary); + return slice; + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f5c55d7b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java new file mode 100644 index 0000000..307f73a --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java @@ -0,0 +1,65 @@ +/* + * + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * + * contributor license agreements. See the NOTICE file distributed with + * + * this work for additional information regarding copyright ownership. + * + * The ASF licenses this file to You under the Apache License, Version 2.0 + * + * (the "License"); you may not use this file except in compliance with + * + * the License. You may obtain a copy of the License at + * + * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * + * + * Unless required by applicable law or agreed to in writing, software + * + * distributed under the License is distributed on an "AS IS" BASIS, + * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and + * + * limitations under the License. + * + * / + */ + +package org.apache.kylin.source.kafka; + +import com.google.common.collect.Lists; +import kafka.message.MessageAndOffset; +import org.apache.kylin.engine.streaming.StreamingMessage; + +import java.nio.ByteBuffer; +import java.util.Collections; + +/** + */ +public final class StringStreamingParser implements StreamingParser { + + public static final StringStreamingParser instance = new StringStreamingParser(); + + private StringStreamingParser() { + } + + @Override + public StreamingMessage parse(MessageAndOffset kafkaMessage) { + final ByteBuffer payload = kafkaMessage.message().payload(); + byte[] bytes = new byte[payload.limit()]; + payload.get(bytes); + return new StreamingMessage(Lists.newArrayList(new String(bytes).split(",")), kafkaMessage.offset(), kafkaMessage.offset(), Collections.<String, Object>emptyMap()); + } + + @Override + public boolean filter(StreamingMessage streamingMessage) { + return true; + } +}
