Revert "KYLIN-1726 use segment uuid instead of name" This reverts commit 42dafc15db40731582d6257c618eff29643930a8.
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/dc27d52f Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/dc27d52f Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/dc27d52f Branch: refs/heads/tempmaster Commit: dc27d52fd676be666ea2778c11b58be05d3cfd6d Parents: 3f28027 Author: Hongbin Ma <mahong...@apache.org> Authored: Mon Sep 19 23:51:57 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Mon Sep 19 23:51:57 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/cube/CubeManager.java | 5 +--- .../kylin/provision/BuildCubeWithStream.java | 26 +++----------------- .../apache/kylin/source/kafka/KafkaMRInput.java | 2 +- .../source/kafka/hadoop/KafkaFlatTableJob.java | 11 ++++++--- .../kafka/hadoop/KafkaInputRecordReader.java | 9 +++---- 5 files changed, 17 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/dc27d52f/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index fc68798..daeca0d 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -444,11 +444,8 @@ public class CubeManager implements IRealizationProvider { updateCube(cubeBuilder); return newSegment; } - public CubeSegment refreshSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset) throws IOException { - return refreshSegment(cube, startDate, endDate, startOffset, endOffset, true); - } - public CubeSegment refreshSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, boolean strictChecking) throws IOException { + public CubeSegment refreshSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset) throws IOException { checkNoBuildingSegment(cube); CubeSegment newSegment = newSegment(cube, startDate, endDate, startOffset, endOffset); http://git-wip-us.apache.org/repos/asf/kylin/blob/dc27d52f/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index 9e779ab..7f79acc 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -21,8 +21,6 @@ package org.apache.kylin.provision; import java.io.File; import java.io.IOException; import java.text.SimpleDateFormat; -import java.util.HashMap; -import java.util.List; import java.util.TimeZone; import java.util.UUID; @@ -147,34 +145,18 @@ public class BuildCubeWithStream { //merge mergeSegment(cubeName, 0, 15000); - List<CubeSegment> segments = cubeManager.getCube(cubeName).getSegments(); - Assert.assertTrue(segments.size() == 1); - - CubeSegment toRefreshSeg = segments.get(0); - HashMap<String, String> partitionOffsetMap = toRefreshSeg.getAdditionalInfo(); - - refreshSegment(cubeName, toRefreshSeg.getSourceOffsetStart(), toRefreshSeg.getSourceOffsetEnd(), partitionOffsetMap); - segments = cubeManager.getCube(cubeName).getSegments(); - Assert.assertTrue(segments.size() == 1); - } private String mergeSegment(String cubeName, long startOffset, long endOffset) throws Exception { - CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false); + CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, true); DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST"); jobService.addJob(job); waitForJob(job.getId()); return job.getId(); } - private String refreshSegment(String cubeName, long startOffset, long endOffset, HashMap<String, String> partitionOffsetMap) throws Exception { - CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false); - segment.setAdditionalInfo(partitionOffsetMap); - CubeInstance cubeInstance = cubeManager.getCube(cubeName); - CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance); - cubeBuilder.setToUpdateSegs(segment); - cubeManager.updateCube(cubeBuilder); - segment = cubeManager.getCube(cubeName).getSegmentById(segment.getUuid()); + private String refreshSegment(String cubeName, long startOffset, long endOffset) throws Exception { + CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset); DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); jobService.addJob(job); waitForJob(job.getId()); @@ -182,7 +164,7 @@ public class BuildCubeWithStream { } private String buildSegment(String cubeName, long startOffset, long endOffset) throws Exception { - CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false); + CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset); DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); jobService.addJob(job); waitForJob(job.getId()); http://git-wip-us.apache.org/repos/asf/kylin/blob/dc27d52f/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java index a5f678f..cfce137 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -165,7 +165,7 @@ public class KafkaMRInput implements IMRInput { jobBuilderSupport.appendMapReduceParameters(cmd); JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath); - JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); + JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName()); JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step"); result.setMapReduceParams(cmd.toString()); http://git-wip-us.apache.org/repos/asf/kylin/blob/dc27d52f/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java index 87d2471..decfb60 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java @@ -33,6 +33,7 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.source.kafka.config.KafkaConfig; import org.slf4j.Logger; @@ -69,14 +70,14 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { options.addOption(OPTION_JOB_NAME); options.addOption(OPTION_CUBE_NAME); options.addOption(OPTION_OUTPUT_PATH); - options.addOption(OPTION_SEGMENT_ID); + options.addOption(OPTION_SEGMENT_NAME); parseOptions(options, args); job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); String cubeName = getOptionValue(OPTION_CUBE_NAME); Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); - String segmentId = getOptionValue(OPTION_SEGMENT_ID); + String segmentName = getOptionValue(OPTION_SEGMENT_NAME); // ---------------------------------------------------------------------------- // add metadata to distributed cache @@ -84,7 +85,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { CubeInstance cube = cubeMgr.getCube(cubeName); job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); - job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentId); + job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); logger.info("Starting: " + job.getJobName()); setJobClasspath(job, cube.getConfig()); @@ -103,9 +104,11 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout())); job.getConfiguration().set(CONFIG_KAFKA_BUFFER_SIZE, String.valueOf(kafkaConfig.getBufferSize())); job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json"); + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); + job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName()); job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name - setupMapper(cube.getSegmentById(segmentId)); + setupMapper(cube.getSegment(segmentName, SegmentStatusEnum.NEW)); job.setNumReduceTasks(0); FileOutputFormat.setOutputPath(job, output); FileOutputFormat.setCompressOutput(job, true); http://git-wip-us.apache.org/repos/asf/kylin/blob/dc27d52f/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java index 6774c9d..f67fef5 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java @@ -105,11 +105,6 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit value = new BytesWritable(); } - if (watermark >= latestOffset) { - log.info("Reach the end offset, stop reading."); - return false; - } - if (messages == null) { log.info("{} fetching offset {} ", topic + ":" + split.getBrokers() + ":" + partition, watermark); TopicPartition topicPartition = new TopicPartition(topic, partition); @@ -124,6 +119,10 @@ public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWrit if (iterator.hasNext()) { ConsumerRecord<String, String> message = iterator.next(); + if (message.offset() >= latestOffset) { + log.info("Reach the end offset, stop reading."); + return false; + } key.set(message.offset()); byte[] valuebytes = Bytes.toBytes(message.value()); value.set(valuebytes, 0, valuebytes.length);