Revert "KYLIN-1726 use segment uuid instead of name"

This reverts commit 42dafc15db40731582d6257c618eff29643930a8.


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/dc27d52f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/dc27d52f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/dc27d52f

Branch: refs/heads/tempmaster
Commit: dc27d52fd676be666ea2778c11b58be05d3cfd6d
Parents: 3f28027
Author: Hongbin Ma <mahong...@apache.org>
Authored: Mon Sep 19 23:51:57 2016 +0800
Committer: Hongbin Ma <mahong...@apache.org>
Committed: Mon Sep 19 23:51:57 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeManager.java |  5 +---
 .../kylin/provision/BuildCubeWithStream.java    | 26 +++-----------------
 .../apache/kylin/source/kafka/KafkaMRInput.java |  2 +-
 .../source/kafka/hadoop/KafkaFlatTableJob.java  | 11 ++++++---
 .../kafka/hadoop/KafkaInputRecordReader.java    |  9 +++----
 5 files changed, 17 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/dc27d52f/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index fc68798..daeca0d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -444,11 +444,8 @@ public class CubeManager implements IRealizationProvider {
         updateCube(cubeBuilder);
         return newSegment;
     }
-    public CubeSegment refreshSegment(CubeInstance cube, long startDate, long 
endDate, long startOffset, long endOffset) throws IOException {
-        return refreshSegment(cube, startDate, endDate, startOffset, 
endOffset, true);
-    }
 
-    public CubeSegment refreshSegment(CubeInstance cube, long startDate, long 
endDate, long startOffset, long endOffset, boolean strictChecking) throws 
IOException {
+    public CubeSegment refreshSegment(CubeInstance cube, long startDate, long 
endDate, long startOffset, long endOffset) throws IOException {
         checkNoBuildingSegment(cube);
 
         CubeSegment newSegment = newSegment(cube, startDate, endDate, 
startOffset, endOffset);

http://git-wip-us.apache.org/repos/asf/kylin/blob/dc27d52f/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 9e779ab..7f79acc 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -21,8 +21,6 @@ package org.apache.kylin.provision;
 import java.io.File;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
-import java.util.HashMap;
-import java.util.List;
 import java.util.TimeZone;
 import java.util.UUID;
 
@@ -147,34 +145,18 @@ public class BuildCubeWithStream {
         //merge
         mergeSegment(cubeName, 0, 15000);
 
-        List<CubeSegment> segments = 
cubeManager.getCube(cubeName).getSegments();
-        Assert.assertTrue(segments.size() == 1);
-
-        CubeSegment toRefreshSeg = segments.get(0);
-        HashMap<String, String> partitionOffsetMap = 
toRefreshSeg.getAdditionalInfo();
-
-        refreshSegment(cubeName, toRefreshSeg.getSourceOffsetStart(), 
toRefreshSeg.getSourceOffsetEnd(), partitionOffsetMap);
-        segments = cubeManager.getCube(cubeName).getSegments();
-        Assert.assertTrue(segments.size() == 1);
-
     }
 
     private String mergeSegment(String cubeName, long startOffset, long 
endOffset) throws Exception {
-        CubeSegment segment = 
cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, 
endOffset, false);
+        CubeSegment segment = 
cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, 
endOffset, true);
         DefaultChainedExecutable job = 
EngineFactory.createBatchMergeJob(segment, "TEST");
         jobService.addJob(job);
         waitForJob(job.getId());
         return job.getId();
     }
 
-    private String refreshSegment(String cubeName, long startOffset, long 
endOffset, HashMap<String, String> partitionOffsetMap) throws Exception {
-        CubeSegment segment = 
cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, 
endOffset, false);
-        segment.setAdditionalInfo(partitionOffsetMap);
-        CubeInstance cubeInstance = cubeManager.getCube(cubeName);
-        CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
-        cubeBuilder.setToUpdateSegs(segment);
-        cubeManager.updateCube(cubeBuilder);
-        segment = 
cubeManager.getCube(cubeName).getSegmentById(segment.getUuid());
+    private String refreshSegment(String cubeName, long startOffset, long 
endOffset) throws Exception {
+        CubeSegment segment = 
cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, 
endOffset);
         DefaultChainedExecutable job = 
EngineFactory.createBatchCubingJob(segment, "TEST");
         jobService.addJob(job);
         waitForJob(job.getId());
@@ -182,7 +164,7 @@ public class BuildCubeWithStream {
     }
 
     private String buildSegment(String cubeName, long startOffset, long 
endOffset) throws Exception {
-        CubeSegment segment = 
cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, 
endOffset, false);
+        CubeSegment segment = 
cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, 
endOffset);
         DefaultChainedExecutable job = 
EngineFactory.createBatchCubingJob(segment, "TEST");
         jobService.addJob(job);
         waitForJob(job.getId());

http://git-wip-us.apache.org/repos/asf/kylin/blob/dc27d52f/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index a5f678f..cfce137 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -165,7 +165,7 @@ public class KafkaMRInput implements IMRInput {
             jobBuilderSupport.appendMapReduceParameters(cmd);
             JobBuilderSupport.appendExecCmdParameters(cmd, 
BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
             JobBuilderSupport.appendExecCmdParameters(cmd, 
BatchConstants.ARG_OUTPUT, outputPath);
-            JobBuilderSupport.appendExecCmdParameters(cmd, 
BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
+            JobBuilderSupport.appendExecCmdParameters(cmd, 
BatchConstants.ARG_SEGMENT_NAME, seg.getName());
             JobBuilderSupport.appendExecCmdParameters(cmd, 
BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + 
seg.getRealization().getName() + "_Step");
 
             result.setMapReduceParams(cmd.toString());

http://git-wip-us.apache.org/repos/asf/kylin/blob/dc27d52f/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index 87d2471..decfb60 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -33,6 +33,7 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.slf4j.Logger;
@@ -69,14 +70,14 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
             options.addOption(OPTION_JOB_NAME);
             options.addOption(OPTION_CUBE_NAME);
             options.addOption(OPTION_OUTPUT_PATH);
-            options.addOption(OPTION_SEGMENT_ID);
+            options.addOption(OPTION_SEGMENT_NAME);
             parseOptions(options, args);
 
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
             String cubeName = getOptionValue(OPTION_CUBE_NAME);
             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
 
-            String segmentId = getOptionValue(OPTION_SEGMENT_ID);
+            String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
 
             // 
----------------------------------------------------------------------------
             // add metadata to distributed cache
@@ -84,7 +85,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
             CubeInstance cube = cubeMgr.getCube(cubeName);
 
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, 
segmentId);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, 
segmentName);
             logger.info("Starting: " + job.getJobName());
 
             setJobClasspath(job, cube.getConfig());
@@ -103,9 +104,11 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
             job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, 
String.valueOf(kafkaConfig.getTimeout()));
             job.getConfiguration().set(CONFIG_KAFKA_BUFFER_SIZE, 
String.valueOf(kafkaConfig.getBufferSize()));
             job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json");
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, 
segmentName);
             job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, 
kafkaConfig.getParserName());
             job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); 
// use cubeName as consumer group name
-            setupMapper(cube.getSegmentById(segmentId));
+            setupMapper(cube.getSegment(segmentName, SegmentStatusEnum.NEW));
             job.setNumReduceTasks(0);
             FileOutputFormat.setOutputPath(job, output);
             FileOutputFormat.setCompressOutput(job, true);

http://git-wip-us.apache.org/repos/asf/kylin/blob/dc27d52f/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
index 6774c9d..f67fef5 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java
@@ -105,11 +105,6 @@ public class KafkaInputRecordReader extends 
RecordReader<LongWritable, BytesWrit
             value = new BytesWritable();
         }
 
-        if (watermark >= latestOffset) {
-            log.info("Reach the end offset, stop reading.");
-            return false;
-        }
-
         if (messages == null) {
             log.info("{} fetching offset {} ", topic + ":" + 
split.getBrokers() + ":" + partition, watermark);
             TopicPartition topicPartition = new TopicPartition(topic, 
partition);
@@ -124,6 +119,10 @@ public class KafkaInputRecordReader extends 
RecordReader<LongWritable, BytesWrit
 
         if (iterator.hasNext()) {
             ConsumerRecord<String, String> message = iterator.next();
+            if (message.offset() >= latestOffset) {
+                log.info("Reach the end offset, stop reading.");
+                return false;
+            }
             key.set(message.offset());
             byte[] valuebytes = Bytes.toBytes(message.value());
             value.set(valuebytes, 0, valuebytes.length);

Reply via email to