Repository: kylin
Updated Branches:
  refs/heads/v1.6.0-rc1 c43f38f48 -> 516749e45 (forced update)


KYLIN-1726 KafkaSource may get wrong start offset when partition added.


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/516749e4
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/516749e4
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/516749e4

Branch: refs/heads/v1.6.0-rc1
Commit: 516749e457b00b9ab1260fb1150fd436cff33a61
Parents: 5d166aa
Author: shaofengshi <shaofeng...@apache.org>
Authored: Wed Nov 9 09:28:09 2016 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Thu Nov 10 10:47:31 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/source/SourcePartition.java    | 16 +++++++--
 .../apache/kylin/source/kafka/KafkaSource.java  | 38 ++++++++++++++++----
 .../kylin/source/kafka/util/KafkaClient.java    |  5 +--
 3 files changed, 48 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/516749e4/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java 
b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
index 8ba749d..e489704 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
@@ -18,8 +18,11 @@
 
 package org.apache.kylin.source;
 
+import java.util.HashMap;
 import java.util.Map;
 
+import com.google.common.base.Objects;
+
 /**
  */
 public class SourcePartition {
@@ -90,14 +93,23 @@ public class SourcePartition {
         this.sourcePartitionOffsetEnd = sourcePartitionOffsetEnd;
     }
 
+    @Override
+    public String toString() {
+        return Objects.toStringHelper(this).add("startDate", 
startDate).add("endDate", endDate).add("startOffset", 
startOffset).add("endOffset", endOffset).add("sourcePartitionOffsetStart", 
sourcePartitionOffsetStart.toString()).add("sourcePartitionOffsetEnd", 
sourcePartitionOffsetEnd.toString()).toString();
+    }
+
     public static SourcePartition getCopyOf(SourcePartition origin) {
         SourcePartition copy = new SourcePartition();
         copy.setStartDate(origin.getStartDate());
         copy.setEndDate(origin.getEndDate());
         copy.setStartOffset(origin.getStartOffset());
         copy.setEndOffset(origin.getEndOffset());
-        
copy.setSourcePartitionOffsetStart(origin.getSourcePartitionOffsetStart());
-        copy.setSourcePartitionOffsetEnd(origin.getSourcePartitionOffsetEnd());
+        if (origin.getSourcePartitionOffsetStart() != null) {
+            copy.setSourcePartitionOffsetStart(new 
HashMap<>(origin.getSourcePartitionOffsetStart()));
+        }
+        if (origin.getSourcePartitionOffsetEnd() != null) {
+            copy.setSourcePartitionOffsetEnd(new 
HashMap<>(origin.getSourcePartitionOffsetEnd()));
+        }
         return copy;
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/516749e4/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 7a5d94f..b0c8e7f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -23,6 +23,7 @@ import java.util.Map;
 
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.IMRInput;
@@ -34,12 +35,16 @@ import org.apache.kylin.source.ReadableTable;
 import org.apache.kylin.source.SourcePartition;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.kylin.source.kafka.util.KafkaClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 
 //used by reflection
 public class KafkaSource implements ISource {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(KafkaSource.class);
+
     @SuppressWarnings("unchecked")
     @Override
     public <I> I adaptToBuildEngine(Class<I> engineInterface) {
@@ -71,32 +76,50 @@ public class KafkaSource implements ISource {
         if (result.getStartOffset() == 0) {
             final CubeSegment last = cube.getLastSegment();
             if (last != null) {
+                logger.debug("Last segment exists, continue from last segment 
" + last.getName() + "'s end position: " + last.getSourcePartitionOffsetEnd());
                 // from last seg's end position
                 
result.setSourcePartitionOffsetStart(last.getSourcePartitionOffsetEnd());
             } else if (cube.getDescriptor().getPartitionOffsetStart() != null 
&& cube.getDescriptor().getPartitionOffsetStart().size() > 0) {
+                logger.debug("Last segment doesn't exist, use the start offset 
that be initiated previously: " + 
cube.getDescriptor().getPartitionOffsetStart());
                 
result.setSourcePartitionOffsetStart(cube.getDescriptor().getPartitionOffsetStart());
             } else {
                 // from the topic's very begining;
+                logger.debug("Last segment doesn't exist, and didn't initiate 
the start offset, will seek from topic's very beginning.");
                 
result.setSourcePartitionOffsetStart(KafkaClient.getEarliestOffsets(cube));
             }
         }
 
-        final KafkaConfig kafakaConfig = 
KafkaConfigManager.getInstance(cube.getConfig()).getKafkaConfig(cube.getFactTable());
+        final KafkaConfig kafakaConfig = 
KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getFactTable());
         final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
         final String topic = kafakaConfig.getTopic();
         try (final KafkaConsumer consumer = 
KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
             final List<PartitionInfo> partitionInfos = 
consumer.partitionsFor(topic);
-            if (partitionInfos.size() > 
result.getSourcePartitionOffsetStart().size()) {
-                // has new partition added
-                for (int x = result.getSourcePartitionOffsetStart().size(); x 
< partitionInfos.size(); x++) {
-                    long earliest = KafkaClient.getEarliestOffset(consumer, 
topic, partitionInfos.get(x).partition());
-                    
result.getSourcePartitionOffsetStart().put(partitionInfos.get(x).partition(), 
earliest);
+            for (PartitionInfo partitionInfo : partitionInfos) {
+                if 
(result.getSourcePartitionOffsetStart().containsKey(partitionInfo.partition()) 
== false) {
+                    // has new partition added
+                    logger.debug("has new partition added");
+                    long earliest = KafkaClient.getEarliestOffset(consumer, 
topic, partitionInfo.partition());
+                    logger.debug("new partition " + partitionInfo.partition() 
+ " starts from " + earliest);
+                    
result.getSourcePartitionOffsetStart().put(partitionInfo.partition(), earliest);
                 }
             }
         }
 
         if (result.getEndOffset() == Long.MAX_VALUE) {
-            
result.setSourcePartitionOffsetEnd(KafkaClient.getCurrentOffsets(cube));
+            logger.debug("Seek end offsets from topic");
+            Map<Integer, Long> latestOffsets = 
KafkaClient.getCurrentOffsets(cube);
+            logger.debug("The end offsets are " + latestOffsets);
+
+            for (Integer partitionId : latestOffsets.keySet()) {
+                if 
(result.getSourcePartitionOffsetStart().containsKey(partitionId)) {
+                    if 
(result.getSourcePartitionOffsetStart().get(partitionId) > 
latestOffsets.get(partitionId)) {
+                        throw new IllegalArgumentException("Partition " + 
partitionId + " end offset (" + latestOffsets.get(partitionId) + ") is smaller 
than start offset ( " + result.getSourcePartitionOffsetStart().get(partitionId) 
+ ")");
+                    }
+                } else {
+                    throw new IllegalStateException("New partition added in 
between, retry.");
+                }
+            }
+            result.setSourcePartitionOffsetEnd(latestOffsets);
         }
 
         long totalStartOffset = 0, totalEndOffset = 0;
@@ -118,6 +141,7 @@ public class KafkaSource implements ISource {
         result.setStartOffset(totalStartOffset);
         result.setEndOffset(totalEndOffset);
 
+        logger.debug("parsePartitionBeforeBuild() return: " + result);
         return result;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/516749e4/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
index a0bbd22..446c076 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.config.BrokerConfig;
@@ -117,7 +118,7 @@ public class KafkaClient {
     }
 
     public static Map<Integer, Long> getCurrentOffsets(final CubeInstance 
cubeInstance) {
-        final KafkaConfig kafakaConfig = 
KafkaConfigManager.getInstance(cubeInstance.getConfig()).getKafkaConfig(cubeInstance.getFactTable());
+        final KafkaConfig kafakaConfig = 
KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getFactTable());
 
         final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
         final String topic = kafakaConfig.getTopic();
@@ -135,7 +136,7 @@ public class KafkaClient {
 
 
     public static Map<Integer, Long> getEarliestOffsets(final CubeInstance 
cubeInstance) {
-        final KafkaConfig kafakaConfig = 
KafkaConfigManager.getInstance(cubeInstance.getConfig()).getKafkaConfig(cubeInstance.getFactTable());
+        final KafkaConfig kafakaConfig = 
KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getFactTable());
 
         final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
         final String topic = kafakaConfig.getTopic();

Reply via email to