Revert "KYLIN-1726 Scalable streaming cubing" This reverts commit 81c7323b633df88eedac8b319fc57f9b62b01a4a.
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b444e273 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b444e273 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b444e273 Branch: refs/heads/tempmaster Commit: b444e273986e73e7866cf105347e507556c6932c Parents: dc27d52 Author: Hongbin Ma <mahong...@apache.org> Authored: Mon Sep 19 23:55:54 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Mon Sep 19 23:55:54 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/job/DeployUtil.java | 35 +-- .../kylin/job/streaming/Kafka10DataLoader.java | 80 ------- .../apache/kylin/common/KylinConfigBase.java | 1 - .../java/org/apache/kylin/cube/CubeSegment.java | 1 - .../java/org/apache/kylin/cube/ISegment.java | 39 ++++ .../cube/gridtable/SegmentGTStartAndEnd.java | 2 +- .../cube/model/CubeJoinedFlatTableDesc.java | 6 - .../cube/model/CubeJoinedFlatTableEnrich.java | 6 - .../apache/kylin/gridtable/ScannerWorker.java | 2 +- .../metadata/model/IJoinedFlatTableDesc.java | 2 - .../apache/kylin/metadata/model/ISegment.java | 36 --- .../kylin/engine/mr/BatchMergeJobBuilder2.java | 3 - .../org/apache/kylin/engine/mr/IMRInput.java | 10 - .../java/org/apache/kylin/engine/mr/MRUtil.java | 4 - .../test_streaming_table_model_desc.json | 6 +- .../kylin/provision/BuildCubeWithStream.java | 218 +++++------------- .../org/apache/kylin/provision/MockKafka.java | 191 ---------------- .../apache/kylin/provision/NetworkUtils.java | 52 ----- pom.xml | 2 +- .../apache/kylin/source/hive/HiveMRInput.java | 11 - source-kafka/pom.xml | 13 +- .../kylin/source/kafka/KafkaConfigManager.java | 46 ++-- .../apache/kylin/source/kafka/KafkaMRInput.java | 221 ------------------- .../apache/kylin/source/kafka/KafkaSource.java | 57 ----- .../kylin/source/kafka/KafkaStreamingInput.java | 65 +++--- .../kylin/source/kafka/MergeOffsetStep.java | 89 -------- .../kylin/source/kafka/SeekOffsetStep.java | 119 ---------- .../kylin/source/kafka/StreamingParser.java | 49 ++-- .../source/kafka/StringStreamingParser.java | 49 ++-- .../source/kafka/TimedJsonStreamParser.java | 49 ++-- .../apache/kylin/source/kafka/TopicMeta.java | 49 ++-- .../kylin/source/kafka/UpdateTimeRangeStep.java | 108 --------- .../source/kafka/config/KafkaClusterConfig.java | 3 +- .../source/kafka/hadoop/KafkaFlatTableJob.java | 165 -------------- .../kafka/hadoop/KafkaFlatTableMapper.java | 51 ----- .../source/kafka/hadoop/KafkaInputFormat.java | 98 -------- .../kafka/hadoop/KafkaInputRecordReader.java | 166 -------------- .../source/kafka/hadoop/KafkaInputSplit.java | 102 --------- .../kylin/source/kafka/util/KafkaClient.java | 115 ---------- .../source/kafka/util/KafkaOffsetMapping.java | 97 -------- .../kylin/source/kafka/util/KafkaRequester.java | 56 +++-- .../kylin/source/kafka/util/KafkaUtils.java | 3 +- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 2 +- .../storage/hbase/cube/v2/CubeHBaseRPC.java | 2 +- .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 2 +- 45 files changed, 348 insertions(+), 2135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java index 9b282e3..8c64f91 100644 --- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java +++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java @@ -143,12 +143,14 @@ public class DeployUtil { deployHiveTables(); } - public static void prepareTestDataForStreamingCube(long startTime, long endTime, int numberOfRecords, String cubeName, StreamDataLoader streamDataLoader) throws IOException { + public static void prepareTestDataForStreamingCube(long startTime, long endTime, String cubeName, StreamDataLoader streamDataLoader) throws IOException { CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); - List<String> data = StreamingTableDataGenerator.generate(numberOfRecords, startTime, endTime, cubeInstance.getFactTable()); + List<String> data = StreamingTableDataGenerator.generate(10000, startTime, endTime, cubeInstance.getFactTable()); + List<String> data2 = StreamingTableDataGenerator.generate(10, endTime, endTime + 300000, cubeInstance.getFactTable()); TableDesc tableDesc = cubeInstance.getFactTableDesc(); //load into kafka streamDataLoader.loadIntoKafka(data); + streamDataLoader.loadIntoKafka(data2); logger.info("Write {} messages into {}", data.size(), streamDataLoader.toString()); //csv data for H2 use @@ -163,7 +165,7 @@ public class DeployUtil { sb.append(StringUtils.join(rowColumns, ",")); sb.append(System.getProperty("line.separator")); } - appendFactTableData(sb.toString(), cubeInstance.getFactTable()); + overrideFactTableData(sb.toString(), cubeInstance.getFactTable()); } public static void overrideFactTableData(String factTableContent, String factTableName) throws IOException { @@ -177,33 +179,6 @@ public class DeployUtil { in.close(); } - public static void appendFactTableData(String factTableContent, String factTableName) throws IOException { - // Write to resource store - ResourceStore store = ResourceStore.getStore(config()); - - InputStream in = new ByteArrayInputStream(factTableContent.getBytes("UTF-8")); - String factTablePath = "/data/" + factTableName + ".csv"; - - File tmpFile = File.createTempFile(factTableName, "csv"); - FileOutputStream out = new FileOutputStream(tmpFile); - - try { - if (store.exists(factTablePath)) { - InputStream oldContent = store.getResource(factTablePath).inputStream; - IOUtils.copy(oldContent, out); - } - IOUtils.copy(in, out); - IOUtils.closeQuietly(in); - - store.deleteResource(factTablePath); - in = new FileInputStream(tmpFile); - store.putResource(factTablePath, in, System.currentTimeMillis()); - } finally { - IOUtils.closeQuietly(out); - IOUtils.closeQuietly(in); - } - - } private static void deployHiveTables() throws Exception { http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java deleted file mode 100644 index a5132af..0000000 --- a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.job.streaming; - -import java.util.List; -import java.util.Properties; - -import javax.annotation.Nullable; - -import org.apache.commons.lang.StringUtils; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kylin.source.kafka.config.BrokerConfig; -import org.apache.kylin.source.kafka.config.KafkaClusterConfig; -import org.apache.kylin.source.kafka.config.KafkaConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Function; -import com.google.common.collect.Collections2; - -import org.apache.kylin.source.kafka.util.KafkaClient; - -/** - * Load prepared data into kafka(for test use) - */ -public class Kafka10DataLoader extends StreamDataLoader { - private static final Logger logger = LoggerFactory.getLogger(Kafka10DataLoader.class); - List<KafkaClusterConfig> kafkaClusterConfigs; - - public Kafka10DataLoader(KafkaConfig kafkaConfig) { - super(kafkaConfig); - this.kafkaClusterConfigs = kafkaConfig.getKafkaClusterConfigs(); - } - - public void loadIntoKafka(List<String> messages) { - - KafkaClusterConfig clusterConfig = kafkaClusterConfigs.get(0); - String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() { - @Nullable - @Override - public String apply(BrokerConfig brokerConfig) { - return brokerConfig.getHost() + ":" + brokerConfig.getPort(); - } - }), ","); - - Properties props = new Properties(); - props.put("acks", "1"); - props.put("retry.backoff.ms", "1000"); - KafkaProducer producer = KafkaClient.getKafkaProducer(brokerList, props); - - int boundary = messages.size() / 10; - for (int i = 0; i < messages.size(); ++i) { - ProducerRecord<String, String> keyedMessage = new ProducerRecord<String, String>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i)); - producer.send(keyedMessage); - if (i % boundary == 0) { - logger.info("sending " + i + " messages to " + this.toString()); - } - } - logger.info("sent " + messages.size() + " messages to " + this.toString()); - producer.close(); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index fafb1fc..79ee084 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -717,7 +717,6 @@ abstract public class KylinConfigBase implements Serializable { Map<Integer, String> r = convertKeyToInteger(getPropertiesByPrefix("kylin.source.engine.")); // ref constants in ISourceAware r.put(0, "org.apache.kylin.source.hive.HiveSource"); - r.put(1, "org.apache.kylin.source.kafka.KafkaSource"); return r; } http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java index afb0d28..79397c3 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java @@ -37,7 +37,6 @@ import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.IBuildable; -import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.realization.IRealization; http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/core-cube/src/main/java/org/apache/kylin/cube/ISegment.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/ISegment.java b/core-cube/src/main/java/org/apache/kylin/cube/ISegment.java new file mode 100644 index 0000000..2e1f214 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/ISegment.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cube; + +import org.apache.kylin.metadata.model.DataModelDesc; +import org.apache.kylin.metadata.model.SegmentStatusEnum; + +public interface ISegment { + + public String getName(); + + public long getDateRangeStart(); + + public long getDateRangeEnd(); + + public long getSourceOffsetStart(); + + public long getSourceOffsetEnd(); + + public DataModelDesc getModel(); + + public SegmentStatusEnum getStatus(); +} http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java index 889a0b2..e31111d 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java @@ -24,7 +24,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.Pair; -import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.cube.ISegment; import org.apache.kylin.dimension.AbstractDateDimEnc; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.metadata.datatype.DataType; http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java index 6ca89c8..6aeb617 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java @@ -26,7 +26,6 @@ import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; -import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; @@ -163,9 +162,4 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc { return cubeDesc.getDistributedByColumn(); } - @Override - public ISegment getSegment() { - return cubeSegment; - } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java index 8af2297..5212859 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java @@ -25,7 +25,6 @@ import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; -import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; @@ -138,9 +137,4 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc { return flatDesc.getDistributedBy(); } - @Override - public ISegment getSegment() { - return flatDesc.getSegment(); - } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java b/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java index 4213cf3..bb7503a 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java @@ -24,7 +24,7 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.Iterator; -import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.cube.ISegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java index ffa2680..f3a4107 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java @@ -37,6 +37,4 @@ public interface IJoinedFlatTableDesc { long getSourceOffsetEnd(); TblColRef getDistributedBy(); - - ISegment getSegment(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java deleted file mode 100644 index f69ae3f..0000000 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.metadata.model; - -public interface ISegment { - - public String getName(); - - public long getDateRangeStart(); - - public long getDateRangeEnd(); - - public long getSourceOffsetStart(); - - public long getSourceOffsetEnd(); - - public DataModelDesc getModel(); - - public SegmentStatusEnum getStatus(); -} http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java index badf628..129d525 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java @@ -34,12 +34,10 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport { private static final Logger logger = LoggerFactory.getLogger(BatchMergeJobBuilder2.class); private final IMROutput2.IMRBatchMergeOutputSide2 outputSide; - private final IMRInput.IMRBatchMergeInputSide inputSide; public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) { super(mergeSegment, submitter); this.outputSide = MRUtil.getBatchMergeOutputSide2(seg); - this.inputSide = MRUtil.getBatchMergeInputSide(seg); } public CubingJob build() { @@ -57,7 +55,6 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport { } // Phase 1: Merge Dictionary - inputSide.addStepPhase1_MergeDictionary(result); result.addTask(createMergeDictionaryStep(mergingSegmentIds)); result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId))); outputSide.addStepPhase1_MergeDictionary(result); http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java index 62cede9..582052f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java @@ -21,7 +21,6 @@ package org.apache.kylin.engine.mr; import org.apache.hadoop.mapreduce.Job; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; -import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.TableDesc; /** @@ -35,9 +34,6 @@ public interface IMRInput { /** Return an InputFormat that reads from specified table. */ public IMRTableInputFormat getTableInputFormat(TableDesc table); - /** Return a helper to participate in batch cubing merge job flow. */ - public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg); - /** * Utility that configures mapper to read from a table. */ @@ -71,10 +67,4 @@ public interface IMRInput { public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow); } - public interface IMRBatchMergeInputSide { - - /** Add step that executes before merge dictionary and before merge cube. */ - public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow); - - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java index 67eef5e..2c3b77f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java @@ -71,10 +71,6 @@ public class MRUtil { return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg); } - public static IMRInput.IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) { - return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg); - } - // use this method instead of ToolRunner.run() because ToolRunner.run() is not thread-sale // Refer to: http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe public static int runMRJob(Tool tool, String[] args) throws Exception { http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json b/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json index e6977e1..cfb889a 100644 --- a/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json +++ b/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json @@ -4,7 +4,7 @@ "name": "test_streaming_table_model_desc", "dimensions": [ { - "table": "DEFAULT.STREAMING_TABLE", + "table": "default.streaming_table", "columns": [ "minute_start", "hour_start", @@ -20,10 +20,10 @@ "item_count" ], "last_modified": 0, - "fact_table": "DEFAULT.STREAMING_TABLE", + "fact_table": "default.streaming_table", "filter_condition": null, "partition_desc": { - "partition_date_column": "DEFAULT.STREAMING_TABLE.minute_start", + "partition_date_column": "default.streaming_table.minute_start", "partition_date_start": 0, "partition_type": "APPEND" } http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index 7f79acc..9490560 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,36 +20,24 @@ package org.apache.kylin.provision; import java.io.File; import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.TimeZone; import java.util.UUID; -import org.I0Itec.zkclient.ZkConnection; import org.apache.commons.lang3.StringUtils; -import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.HBaseMetadataTestCase; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; -import org.apache.kylin.engine.EngineFactory; +import org.apache.kylin.engine.streaming.OneOffStreamingBuilder; import org.apache.kylin.engine.streaming.StreamingConfig; import org.apache.kylin.engine.streaming.StreamingManager; import org.apache.kylin.job.DeployUtil; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.DefaultChainedExecutable; -import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.job.impl.threadpool.DefaultScheduler; -import org.apache.kylin.job.manager.ExecutableManager; -import org.apache.kylin.job.streaming.Kafka10DataLoader; +import org.apache.kylin.job.streaming.KafkaDataLoader; +import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.source.kafka.KafkaConfigManager; -import org.apache.kylin.source.kafka.config.BrokerConfig; import org.apache.kylin.source.kafka.config.KafkaConfig; -import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; -import org.junit.Assert; +import org.apache.kylin.storage.hbase.util.StorageCleanupJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,123 +46,31 @@ import org.slf4j.LoggerFactory; */ public class BuildCubeWithStream { - private static final Logger logger = LoggerFactory.getLogger(org.apache.kylin.provision.BuildCubeWithStream.class); - - private CubeManager cubeManager; - private DefaultScheduler scheduler; - protected ExecutableManager jobService; + private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStream.class); private static final String cubeName = "test_streaming_table_cube"; + private static final long startTime = DateFormat.stringToMillis("2015-01-01 00:00:00"); + private static final long endTime = DateFormat.stringToMillis("2015-01-03 00:00:00"); + private static final long batchInterval = 16 * 60 * 60 * 1000;//16 hours - private KafkaConfig kafkaConfig; - private MockKafka kafkaServer; - - public void before() throws Exception { - deployEnv(); - - final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - jobService = ExecutableManager.getInstance(kylinConfig); - scheduler = DefaultScheduler.createInstance(); - scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock()); - if (!scheduler.hasStarted()) { - throw new RuntimeException("scheduler has not been started"); - } - cubeManager = CubeManager.getInstance(kylinConfig); - - final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); - final String factTable = cubeInstance.getFactTable(); - - final StreamingManager streamingManager = StreamingManager.getInstance(kylinConfig); - final StreamingConfig streamingConfig = streamingManager.getStreamingConfig(factTable); - kafkaConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(streamingConfig.getName()); - - String topicName = UUID.randomUUID().toString(); - String localIp = NetworkUtils.getLocalIp(); - BrokerConfig brokerConfig = kafkaConfig.getKafkaClusterConfigs().get(0).getBrokerConfigs().get(0); - brokerConfig.setHost(localIp); - kafkaConfig.setTopic(topicName); - KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(kafkaConfig); - - startEmbeddedKafka(topicName, brokerConfig); - } - - private void startEmbeddedKafka(String topicName, BrokerConfig brokerConfig) { - //Start mock Kakfa - String zkConnectionStr = "sandbox:2181"; - ZkConnection zkConnection = new ZkConnection(zkConnectionStr); - // Assert.assertEquals(ZooKeeper.States.CONNECTED, zkConnection.getZookeeperState()); - kafkaServer = new MockKafka(zkConnection, brokerConfig.getPort(), brokerConfig.getId()); - kafkaServer.start(); - - kafkaServer.createTopic(topicName, 3, 1); - kafkaServer.waitTopicUntilReady(topicName); - - MetadataResponse.TopicMetadata topicMetadata = kafkaServer.fetchTopicMeta(topicName); - Assert.assertEquals(topicName, topicMetadata.topic()); - } - - private void generateStreamData(long startTime, long endTime, int numberOfRecords) throws IOException { - Kafka10DataLoader dataLoader = new Kafka10DataLoader(kafkaConfig); - DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, numberOfRecords, cubeName, dataLoader); - logger.info("Test data inserted into Kafka"); - } + private KylinConfig kylinConfig; - private void clearSegment(String cubeName) throws Exception { - CubeInstance cube = cubeManager.getCube(cubeName); - // remove all existing segments - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()])); - cubeManager.updateCube(cubeBuilder); - } - - public void build() throws Exception { - clearSegment(cubeName); - SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); - f.setTimeZone(TimeZone.getTimeZone("GMT")); - long date1 = 0; - long date2 = f.parse("2013-01-01").getTime(); - - int numberOfRecrods1 = 10000; - generateStreamData(date1, date2, numberOfRecrods1); - buildSegment(cubeName, 0, Long.MAX_VALUE); - - long date3 = f.parse("2013-04-01").getTime(); - int numberOfRecrods2 = 5000; - generateStreamData(date2, date3, numberOfRecrods2); - buildSegment(cubeName, 0, Long.MAX_VALUE); - - //merge - mergeSegment(cubeName, 0, 15000); - - } - - private String mergeSegment(String cubeName, long startOffset, long endOffset) throws Exception { - CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, true); - DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST"); - jobService.addJob(job); - waitForJob(job.getId()); - return job.getId(); - } + public static void main(String[] args) throws Exception { - private String refreshSegment(String cubeName, long startOffset, long endOffset) throws Exception { - CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset); - DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); - jobService.addJob(job); - waitForJob(job.getId()); - return job.getId(); - } + try { + beforeClass(); - private String buildSegment(String cubeName, long startOffset, long endOffset) throws Exception { - CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset); - DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); - jobService.addJob(job); - waitForJob(job.getId()); - return job.getId(); - } + BuildCubeWithStream buildCubeWithStream = new BuildCubeWithStream(); + buildCubeWithStream.before(); + buildCubeWithStream.build(); + logger.info("Build is done"); + buildCubeWithStream.cleanup(); + logger.info("Going to exit"); + System.exit(0); + } catch (Exception e) { + logger.error("error", e); + System.exit(1); + } - protected void deployEnv() throws IOException { - DeployUtil.overrideJobJarLocations(); - //DeployUtil.initCliWorkDir(); - //DeployUtil.deployMetadata(); } public static void beforeClass() throws Exception { @@ -187,54 +83,44 @@ public class BuildCubeWithStream { HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA); } - public static void afterClass() throws Exception { - HBaseMetadataTestCase.staticCleanupTestMetadata(); + protected void deployEnv() throws IOException { + DeployUtil.overrideJobJarLocations(); } - public void after() { - kafkaServer.stop(); - } + public void before() throws Exception { + deployEnv(); - protected void waitForJob(String jobId) { - while (true) { - AbstractExecutable job = jobService.getJob(jobId); - if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) { - break; - } else { - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - } + kylinConfig = KylinConfig.getInstanceFromEnv(); + final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); + final String factTable = cubeInstance.getFactTable(); + final StreamingConfig config = StreamingManager.getInstance(kylinConfig).getStreamingConfig(factTable); - public static void main(String[] args) throws Exception { - try { - beforeClass(); + //Use a random topic for kafka data stream + KafkaConfig streamingConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(config.getName()); + streamingConfig.setTopic(UUID.randomUUID().toString()); + KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(streamingConfig); - BuildCubeWithStream buildCubeWithStream = new BuildCubeWithStream(); - buildCubeWithStream.before(); - buildCubeWithStream.build(); - logger.info("Build is done"); - buildCubeWithStream.after(); - afterClass(); - logger.info("Going to exit"); - System.exit(0); - } catch (Exception e) { - logger.error("error", e); - System.exit(1); - } + DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, cubeName, new KafkaDataLoader(streamingConfig)); + } + public void cleanup() throws Exception { + cleanupOldStorage(); + HBaseMetadataTestCase.staticCleanupTestMetadata(); } protected int cleanupOldStorage() throws Exception { String[] args = { "--delete", "true" }; - // KapStorageCleanupCLI cli = new KapStorageCleanupCLI(); - // cli.execute(args); + StorageCleanupJob cli = new StorageCleanupJob(); + cli.execute(args); return 0; } + public void build() throws Exception { + logger.info("start time:" + startTime + " end time:" + endTime + " batch interval:" + batchInterval + " batch count:" + ((endTime - startTime) / batchInterval)); + for (long start = startTime; start < endTime; start += batchInterval) { + logger.info(String.format("build batch:{%d, %d}", start, start + batchInterval)); + new OneOffStreamingBuilder(RealizationType.CUBE, cubeName, start, start + batchInterval).build().run(); + } + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java b/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java deleted file mode 100644 index 3f47923..0000000 --- a/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.provision; - -import java.io.UnsupportedEncodingException; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; -import java.util.Random; -import java.util.UUID; - -import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.ZkConnection; -import org.I0Itec.zkclient.exception.ZkMarshallingError; -import org.I0Itec.zkclient.serialize.ZkSerializer; -import org.apache.kafka.common.requests.MetadataResponse; - -import kafka.admin.AdminUtils; -import kafka.server.KafkaConfig; -import kafka.server.KafkaServerStartable; -import kafka.utils.ZkUtils; - -public class MockKafka { - private static Properties createProperties(ZkConnection zkServerConnection, String logDir, String port, String brokerId) { - Properties properties = new Properties(); - properties.put("port", port); - properties.put("broker.id", brokerId); - properties.put("log.dirs", logDir); - properties.put("host.name", "localhost"); - properties.put("offsets.topic.replication.factor", "1"); - properties.put("delete.topic.enable", "true"); - properties.put("zookeeper.connect", zkServerConnection.getServers()); - String ip = NetworkUtils.getLocalIp(); - properties.put("listeners", "PLAINTEXT://" + ip + ":" + port); - properties.put("advertised.listeners", "PLAINTEXT://" + ip + ":" + port); - return properties; - } - - private KafkaServerStartable kafkaServer; - - private ZkConnection zkConnection; - - public MockKafka(ZkConnection zkServerConnection) { - this(zkServerConnection, System.getProperty("java.io.tmpdir") + "/" + UUID.randomUUID().toString(), "9092", "1"); - start(); - } - - private MockKafka(Properties properties) { - KafkaConfig kafkaConfig = new KafkaConfig(properties); - kafkaServer = new KafkaServerStartable(kafkaConfig); - } - - public MockKafka(ZkConnection zkServerConnection, int port, int brokerId) { - this(zkServerConnection, System.getProperty("java.io.tmpdir") + "/" + UUID.randomUUID().toString(), String.valueOf(port), String.valueOf(brokerId)); - start(); - } - - private MockKafka(ZkConnection zkServerConnection, String logDir, String port, String brokerId) { - this(createProperties(zkServerConnection, logDir, port, brokerId)); - this.zkConnection = zkServerConnection; - System.out.println(String.format("Kafka %s:%s dir:%s", kafkaServer.serverConfig().brokerId(), kafkaServer.serverConfig().port(), kafkaServer.serverConfig().logDirs())); - } - - public void createTopic(String topic, int partition, int replication) { - ZkClient zkClient = new ZkClient(zkConnection); - ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); - zkClient.setZkSerializer(new ZKStringSerializer()); - AdminUtils.createTopic(zkUtils, topic, partition, replication, new Properties(), null); - zkClient.close(); - } - - public void createTopic(String topic) { - this.createTopic(topic, 1, 1); - } - - public MetadataResponse.TopicMetadata fetchTopicMeta(String topic) { - ZkClient zkClient = new ZkClient(zkConnection); - ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); - zkClient.setZkSerializer(new ZKStringSerializer()); - MetadataResponse.TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils); - zkClient.close(); - return topicMetadata; - } - - /** - * Delete may not work - * - * @param topic - */ - public void deleteTopic(String topic) { - ZkClient zkClient = new ZkClient(zkConnection); - ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); - zkClient.setZkSerializer(new ZKStringSerializer()); - AdminUtils.deleteTopic(zkUtils, topic); - zkClient.close(); - } - - public String getConnectionString() { - return String.format("%s:%d", kafkaServer.serverConfig().hostName(), kafkaServer.serverConfig().port()); - } - - public void start() { - kafkaServer.startup(); - System.out.println("embedded kafka is up"); - } - - public void stop() { - kafkaServer.shutdown(); - System.out.println("embedded kafka down"); - } - - public MetadataResponse.TopicMetadata waitTopicUntilReady(String topic) { - boolean isReady = false; - MetadataResponse.TopicMetadata topicMeta = null; - while (!isReady) { - Random random = new Random(); - topicMeta = this.fetchTopicMeta(topic); - List<MetadataResponse.PartitionMetadata> partitionsMetadata = topicMeta.partitionMetadata(); - Iterator<MetadataResponse.PartitionMetadata> iterator = partitionsMetadata.iterator(); - boolean hasGotLeader = true; - boolean hasGotReplica = true; - while (iterator.hasNext()) { - MetadataResponse.PartitionMetadata partitionMeta = iterator.next(); - hasGotLeader &= (!partitionMeta.leader().isEmpty()); - if (partitionMeta.leader().isEmpty()) { - System.out.println("Partition leader is not ready, wait 1s."); - break; - } - hasGotReplica &= (!partitionMeta.replicas().isEmpty()); - if (partitionMeta.replicas().isEmpty()) { - System.out.println("Partition replica is not ready, wait 1s."); - break; - } - } - isReady = hasGotLeader & hasGotReplica; - if (!isReady) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - } - } - } - return topicMeta; - } - - public String getZookeeperConnection() { - return this.zkConnection.getServers(); - } -} - -class ZKStringSerializer implements ZkSerializer { - - @Override - public byte[] serialize(Object data) throws ZkMarshallingError { - byte[] bytes = null; - try { - bytes = data.toString().getBytes("UTF-8"); - } catch (UnsupportedEncodingException e) { - throw new ZkMarshallingError(e); - } - return bytes; - } - - @Override - public Object deserialize(byte[] bytes) throws ZkMarshallingError { - if (bytes == null) - return null; - else - try { - return new String(bytes, "UTF-8"); - } catch (UnsupportedEncodingException e) { - throw new ZkMarshallingError(e); - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/kylin-it/src/test/java/org/apache/kylin/provision/NetworkUtils.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/NetworkUtils.java b/kylin-it/src/test/java/org/apache/kylin/provision/NetworkUtils.java deleted file mode 100644 index 98f6d04..0000000 --- a/kylin-it/src/test/java/org/apache/kylin/provision/NetworkUtils.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.provision; - -import java.net.Inet4Address; -import java.net.InetAddress; -import java.net.NetworkInterface; -import java.net.SocketException; -import java.util.Enumeration; - -public class NetworkUtils { - - public static String getLocalIp() { - try { - Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces(); - while (interfaces.hasMoreElements()) { - NetworkInterface iface = interfaces.nextElement(); - if (iface.isLoopback() || !iface.isUp() || iface.isVirtual() || iface.isPointToPoint()) - continue; - if (iface.getName().startsWith("vboxnet")) - continue; - - Enumeration<InetAddress> addresses = iface.getInetAddresses(); - while (addresses.hasMoreElements()) { - InetAddress addr = addresses.nextElement(); - final String ip = addr.getHostAddress(); - if (Inet4Address.class == addr.getClass()) - return ip; - } - } - } catch (SocketException e) { - throw new RuntimeException(e); - } - return null; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 30d3324..1abc4eb 100644 --- a/pom.xml +++ b/pom.xml @@ -55,7 +55,7 @@ <!-- HBase versions --> <hbase-hadoop2.version>0.98.8-hadoop2</hbase-hadoop2.version> - <kafka.version>0.10.0.0</kafka.version> + <kafka.version>0.8.1</kafka.version> <!-- Hadoop deps, keep compatible with hadoop2.version --> <zookeeper.version>3.4.6</zookeeper.version> http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 09ac522..520d7cc 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -50,7 +50,6 @@ import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; -import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.LookupDesc; import org.apache.kylin.metadata.model.TableDesc; import org.slf4j.Logger; @@ -70,16 +69,6 @@ public class HiveMRInput implements IMRInput { return new HiveTableInputFormat(table.getIdentity()); } - @Override - public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg) { - return new IMRBatchMergeInputSide() { - @Override - public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) { - // doing nothing - } - }; - } - public static class HiveTableInputFormat implements IMRTableInputFormat { final String dbName; final String tableName; http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/source-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml index 212f4c6..90c2211 100644 --- a/source-kafka/pom.xml +++ b/source-kafka/pom.xml @@ -32,11 +32,10 @@ </parent> + <properties> + </properties> + <dependencies> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-engine-mr</artifactId> - </dependency> <dependency> <groupId>org.apache.kylin</groupId> @@ -61,10 +60,16 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>org.apache.hive.hcatalog</groupId> + <artifactId>hive-hcatalog-core</artifactId> + <scope>provided</scope> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies> + </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java index cfdf316..d594873 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java @@ -1,19 +1,35 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * + * contributor license agreements. See the NOTICE file distributed with + * + * this work for additional information regarding copyright ownership. + * + * The ASF licenses this file to You under the Apache License, Version 2.0 + * + * (the "License"); you may not use this file except in compliance with + * + * the License. You may obtain a copy of the License at + * + * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * + * + * Unless required by applicable law or agreed to in writing, software + * + * distributed under the License is distributed on an "AS IS" BASIS, + * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and + * + * limitations under the License. + * + * / */ package org.apache.kylin.source.kafka; http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java deleted file mode 100644 index cfce137..0000000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.source.kafka; - -import com.google.common.base.Function; -import com.google.common.collect.Lists; -import org.apache.kylin.metadata.model.ISegment; -import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.StreamingMessage; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; -import org.apache.kylin.engine.mr.IMRInput; -import org.apache.kylin.engine.mr.JobBuilderSupport; -import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.engine.mr.common.MapReduceExecutable; -import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; -import org.apache.kylin.job.JoinedFlatTable; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.execution.DefaultChainedExecutable; -import org.apache.kylin.metadata.model.ColumnDesc; -import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.source.kafka.config.KafkaConfig; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; - -public class KafkaMRInput implements IMRInput { - - CubeSegment cubeSegment; - - @Override - public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { - this.cubeSegment = (CubeSegment)flatDesc.getSegment(); - return new BatchCubingInputSide(cubeSegment); - } - - @Override - public IMRTableInputFormat getTableInputFormat(TableDesc table) { - KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()); - KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(table.getIdentity()); - List<TblColRef> columns = Lists.transform(Arrays.asList(table.getColumns()), new Function<ColumnDesc, TblColRef>() { - @Nullable - @Override - public TblColRef apply(ColumnDesc input) { - return input.getRef(); - } - }); - - return new KafkaTableInputFormat(cubeSegment, columns, kafkaConfig, null); - } - - @Override - public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg) { - return new KafkaMRBatchMergeInputSide((CubeSegment) seg); - } - - public static class KafkaTableInputFormat implements IMRTableInputFormat { - private final CubeSegment cubeSegment; - private List<TblColRef> columns; - private StreamingParser streamingParser; - private KafkaConfig kafkaConfig; - private final JobEngineConfig conf; - - public KafkaTableInputFormat(CubeSegment cubeSegment, List<TblColRef> columns, KafkaConfig kafkaConfig, JobEngineConfig conf) { - this.cubeSegment = cubeSegment; - this.columns = columns; - this.kafkaConfig = kafkaConfig; - this.conf = conf; - } - - @Override - public void configureJob(Job job) { - job.setInputFormatClass(SequenceFileInputFormat.class); - job.setMapOutputValueClass(Text.class); - String jobId = job.getConfiguration().get(BatchConstants.ARG_CUBING_JOB_ID); - IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(cubeSegment); - String inputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId)); - try { - FileInputFormat.addInputPath(job, new Path(inputPath)); - } catch (IOException e) { - throw new IllegalStateException(e); - } - } - - @Override - public String[] parseMapperInput(Object mapperInput) { - if (streamingParser == null) { - try { - streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns); - } catch (ReflectiveOperationException e) { - throw new IllegalArgumentException(); - } - } - Text text = (Text) mapperInput; - ByteBuffer buffer = ByteBuffer.wrap(text.getBytes(), 0, text.getLength()).slice(); - StreamingMessage streamingMessage = streamingParser.parse(buffer); - return streamingMessage.getData().toArray(new String[streamingMessage.getData().size()]); - } - - } - - public static class BatchCubingInputSide implements IMRBatchCubingInputSide { - - final JobEngineConfig conf; - final CubeSegment seg; - private String outputPath; - - public BatchCubingInputSide(CubeSegment seg) { - this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); - this.seg = seg; - } - - @Override - public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { - jobFlow.addTask(createUpdateSegmentOffsetStep(jobFlow.getId())); - jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId())); - } - - public SeekOffsetStep createUpdateSegmentOffsetStep(String jobId) { - final SeekOffsetStep result = new SeekOffsetStep(); - result.setName("Seek and update offset step"); - - CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams()); - CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); - CubingExecutableUtil.setCubingJobId(jobId, result.getParams()); - - return result; - } - - private MapReduceExecutable createSaveKafkaDataStep(String jobId) { - MapReduceExecutable result = new MapReduceExecutable(); - - IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg); - outputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId)); - result.setName("Save data from Kafka"); - result.setMapReduceJobClass(KafkaFlatTableJob.class); - JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "system"); - StringBuilder cmd = new StringBuilder(); - jobBuilderSupport.appendMapReduceParameters(cmd); - JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); - JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath); - JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName()); - JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step"); - - result.setMapReduceParams(cmd.toString()); - return result; - } - - @Override - public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { - final UpdateTimeRangeStep result = new UpdateTimeRangeStep(); - result.setName("Update Segment Time Range"); - CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams()); - CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); - CubingExecutableUtil.setCubingJobId(jobFlow.getId(), result.getParams()); - JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "SYSTEM"); - result.getParams().put(BatchConstants.CFG_OUTPUT_PATH, jobBuilderSupport.getFactDistinctColumnsPath(jobFlow.getId())); - jobFlow.addTask(result); - - } - - @Override - public IMRTableInputFormat getFlatTableInputFormat() { - KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()); - KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(seg.getRealization().getFactTable()); - List<TblColRef> columns = new CubeJoinedFlatTableDesc(seg).getAllColumns(); - - return new KafkaTableInputFormat(seg, columns, kafkaConfig, conf); - - } - - } - - class KafkaMRBatchMergeInputSide implements IMRBatchMergeInputSide { - - private CubeSegment cubeSegment; - - KafkaMRBatchMergeInputSide(CubeSegment cubeSegment) { - this.cubeSegment = cubeSegment; - } - - @Override - public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) { - - final MergeOffsetStep result = new MergeOffsetStep(); - result.setName("Merge offset step"); - - CubingExecutableUtil.setCubeName(cubeSegment.getRealization().getName(), result.getParams()); - CubingExecutableUtil.setSegmentId(cubeSegment.getUuid(), result.getParams()); - CubingExecutableUtil.setCubingJobId(jobFlow.getId(), result.getParams()); - jobFlow.addTask(result); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java deleted file mode 100644 index d039583..0000000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.source.kafka; - -import com.google.common.collect.Lists; -import org.apache.kylin.engine.mr.IMRInput; -import org.apache.kylin.engine.streaming.StreamingConfig; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.source.ISource; -import org.apache.kylin.source.ReadableTable; -import org.apache.kylin.source.kafka.config.KafkaConfig; - -import java.util.List; - -//used by reflection -public class KafkaSource implements ISource { - - @SuppressWarnings("unchecked") - @Override - public <I> I adaptToBuildEngine(Class<I> engineInterface) { - if (engineInterface == IMRInput.class) { - return (I) new KafkaMRInput(); - } else { - throw new RuntimeException("Cannot adapt to " + engineInterface); - } - } - - @Override - public ReadableTable createReadableTable(TableDesc tableDesc) { - throw new UnsupportedOperationException(); - } - - @Override - public List<String> getMRDependentResources(TableDesc table) { - List<String> dependentResources = Lists.newArrayList(); - dependentResources.add(KafkaConfig.concatResourcePath(table.getIdentity())); - dependentResources.add(StreamingConfig.concatResourcePath(table.getIdentity())); - return dependentResources; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java index de42689..c3bdb75 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java @@ -1,20 +1,36 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ + * + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * + * contributor license agreements. See the NOTICE file distributed with + * + * this work for additional information regarding copyright ownership. + * + * The ASF licenses this file to You under the Apache License, Version 2.0 + * + * (the "License"); you may not use this file except in compliance with + * + * the License. You may obtain a copy of the License at + * + * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * + * + * Unless required by applicable law or agreed to in writing, software + * + * distributed under the License is distributed on an "AS IS" BASIS, + * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and + * + * limitations under the License. + * + * / + */ package org.apache.kylin.source.kafka; import java.util.List; @@ -24,9 +40,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import com.google.common.base.Function; -import kafka.cluster.BrokerEndPoint; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.StreamingBatch; @@ -53,8 +66,6 @@ import kafka.javaapi.FetchResponse; import kafka.javaapi.PartitionMetadata; import kafka.message.MessageAndOffset; -import javax.annotation.Nullable; - @SuppressWarnings("unused") public class KafkaStreamingInput implements IStreamingInput { @@ -140,16 +151,8 @@ public class KafkaStreamingInput implements IStreamingInput { if (partitionMetadata.errorCode() != 0) { logger.warn("PartitionMetadata errorCode: " + partitionMetadata.errorCode()); } - replicaBrokers = Lists.transform(partitionMetadata.replicas(), new Function<BrokerEndPoint, Broker>() { - @Nullable - @Override - public Broker apply(@Nullable BrokerEndPoint brokerEndPoint) { - return new Broker(brokerEndPoint, SecurityProtocol.PLAINTEXT); - } - }); - BrokerEndPoint leaderEndpoint = partitionMetadata.leader(); - - return new Broker(leaderEndpoint, SecurityProtocol.PLAINTEXT); + replicaBrokers = partitionMetadata.replicas(); + return partitionMetadata.leader(); } else { return null; } http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java deleted file mode 100644 index a21b980..0000000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.source.kafka; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -import com.google.common.collect.Maps; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; -import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecuteResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.kylin.source.kafka.util.KafkaOffsetMapping; - -/** - */ -public class MergeOffsetStep extends AbstractExecutable { - - private static final Logger logger = LoggerFactory.getLogger(MergeOffsetStep.class); - public MergeOffsetStep() { - super(); - } - - @Override - protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); - final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); - final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); - - List<CubeSegment> mergingSegs = cube.getMergingSegments(segment); - Map<Integer, Long> mergedStartOffsets = Maps.newHashMap(); - Map<Integer, Long> mergedEndOffsets = Maps.newHashMap(); - - long dateRangeStart = Long.MAX_VALUE, dateRangeEnd = 0; - for (CubeSegment seg: mergingSegs) { - Map<Integer, Long> startOffsets = KafkaOffsetMapping.parseOffsetStart(seg); - Map<Integer, Long> endOffsets = KafkaOffsetMapping.parseOffsetEnd(seg); - - for (Integer partition : startOffsets.keySet()) { - long currentStart = mergedStartOffsets.get(partition) != null ? Long.valueOf(mergedStartOffsets.get(partition)) : Long.MAX_VALUE; - long currentEnd = mergedEndOffsets.get(partition) != null ? Long.valueOf(mergedEndOffsets.get(partition)) : 0; - mergedStartOffsets.put(partition, Math.min(currentStart, startOffsets.get(partition))); - mergedEndOffsets.put(partition, Math.max(currentEnd, endOffsets.get(partition))); - } - dateRangeStart = Math.min(dateRangeStart, seg.getDateRangeStart()); - dateRangeEnd = Math.max(dateRangeEnd, seg.getDateRangeEnd()); - } - - KafkaOffsetMapping.saveOffsetStart(segment, mergedStartOffsets); - KafkaOffsetMapping.saveOffsetEnd(segment, mergedEndOffsets); - segment.setDateRangeStart(dateRangeStart); - segment.setDateRangeEnd(dateRangeEnd); - - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToUpdateSegs(segment); - try { - cubeManager.updateCube(cubeBuilder); - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); - } catch (IOException e) { - logger.error("fail to update cube segment offset", e); - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java deleted file mode 100644 index 5dca93f..0000000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.source.kafka; - -import org.apache.kylin.source.kafka.util.KafkaClient; -import org.apache.kylin.source.kafka.util.KafkaOffsetMapping; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; -import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecuteResult; -import org.apache.kylin.source.kafka.config.KafkaConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -/** - */ -public class SeekOffsetStep extends AbstractExecutable { - - private static final Logger logger = LoggerFactory.getLogger(SeekOffsetStep.class); - - public SeekOffsetStep() { - super(); - } - - @Override - protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); - final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); - final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); - - Map<Integer, Long> startOffsets = KafkaOffsetMapping.parseOffsetStart(segment); - Map<Integer, Long> endOffsets = KafkaOffsetMapping.parseOffsetEnd(segment); - - if (startOffsets.size() > 0 && endOffsets.size() > 0 && startOffsets.size() == endOffsets.size()) { - return new ExecuteResult(ExecuteResult.State.SUCCEED, "skipped, as the offset is provided."); - } - - final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(context.getConfig()).getKafkaConfig(cube.getFactTable()); - final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig); - final String topic = kafakaConfig.getTopic(); - try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) { - final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); - - if (startOffsets.isEmpty()) { - // user didn't specify start offset, use the biggest offset in existing segments as start - for (CubeSegment seg : cube.getSegments()) { - Map<Integer, Long> segEndOffset = KafkaOffsetMapping.parseOffsetEnd(seg); - for (PartitionInfo partition : partitionInfos) { - int partitionId = partition.partition(); - if (segEndOffset.containsKey(partitionId)) { - startOffsets.put(partitionId, Math.max(startOffsets.containsKey(partitionId) ? startOffsets.get(partitionId) : 0, segEndOffset.get(partitionId))); - } - } - } - - if (partitionInfos.size() > startOffsets.size()) { - // has new partition added - for (int x = startOffsets.size(); x < partitionInfos.size(); x++) { - long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition()); - startOffsets.put(partitionInfos.get(x).partition(), earliest); - } - } - - logger.info("Get start offset for segment " + segment.getName() + ": " + startOffsets.toString()); - } - - if (endOffsets.isEmpty()) { - // user didn't specify end offset, use latest offset in kafka - for (PartitionInfo partitionInfo : partitionInfos) { - long latest = KafkaClient.getLatestOffset(consumer, topic, partitionInfo.partition()); - endOffsets.put(partitionInfo.partition(), latest); - } - - logger.info("Get end offset for segment " + segment.getName() + ": " + endOffsets.toString()); - } - } - - KafkaOffsetMapping.saveOffsetStart(segment, startOffsets); - KafkaOffsetMapping.saveOffsetEnd(segment, endOffsets); - - segment.setName(CubeSegment.makeSegmentName(0, 0, segment.getSourceOffsetStart(), segment.getSourceOffsetEnd())); - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToUpdateSegs(segment); - try { - cubeManager.updateCube(cubeBuilder); - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); - } catch (IOException e) { - logger.error("fail to update cube segment offset", e); - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java index 6b7d658..cb6a72b 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java @@ -1,20 +1,37 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ + * + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * + * contributor license agreements. See the NOTICE file distributed with + * + * this work for additional information regarding copyright ownership. + * + * The ASF licenses this file to You under the Apache License, Version 2.0 + * + * (the "License"); you may not use this file except in compliance with + * + * the License. You may obtain a copy of the License at + * + * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * + * + * Unless required by applicable law or agreed to in writing, software + * + * distributed under the License is distributed on an "AS IS" BASIS, + * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and + * + * limitations under the License. + * + * / + */ + package org.apache.kylin.source.kafka; import java.lang.reflect.Constructor; http://git-wip-us.apache.org/repos/asf/kylin/blob/b444e273/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java index 666297f..8888d67 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java @@ -1,20 +1,37 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ + * + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * + * contributor license agreements. See the NOTICE file distributed with + * + * this work for additional information regarding copyright ownership. + * + * The ASF licenses this file to You under the Apache License, Version 2.0 + * + * (the "License"); you may not use this file except in compliance with + * + * the License. You may obtain a copy of the License at + * + * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * + * + * Unless required by applicable law or agreed to in writing, software + * + * distributed under the License is distributed on an "AS IS" BASIS, + * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and + * + * limitations under the License. + * + * / + */ + package org.apache.kylin.source.kafka; import java.nio.ByteBuffer;