Repository: kylin Updated Branches: refs/heads/v1.6.0-rc1 c43f38f48 -> 516749e45 (forced update)
KYLIN-1726 KafkaSource may get wrong start offset when partition added. Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/516749e4 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/516749e4 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/516749e4 Branch: refs/heads/v1.6.0-rc1 Commit: 516749e457b00b9ab1260fb1150fd436cff33a61 Parents: 5d166aa Author: shaofengshi <shaofeng...@apache.org> Authored: Wed Nov 9 09:28:09 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Thu Nov 10 10:47:31 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/source/SourcePartition.java | 16 +++++++-- .../apache/kylin/source/kafka/KafkaSource.java | 38 ++++++++++++++++---- .../kylin/source/kafka/util/KafkaClient.java | 5 +-- 3 files changed, 48 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/516749e4/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java index 8ba749d..e489704 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java @@ -18,8 +18,11 @@ package org.apache.kylin.source; +import java.util.HashMap; import java.util.Map; +import com.google.common.base.Objects; + /** */ public class SourcePartition { @@ -90,14 +93,23 @@ public class SourcePartition { this.sourcePartitionOffsetEnd = sourcePartitionOffsetEnd; } + @Override + public String toString() { + return Objects.toStringHelper(this).add("startDate", startDate).add("endDate", endDate).add("startOffset", startOffset).add("endOffset", endOffset).add("sourcePartitionOffsetStart", sourcePartitionOffsetStart.toString()).add("sourcePartitionOffsetEnd", sourcePartitionOffsetEnd.toString()).toString(); + } + public static SourcePartition getCopyOf(SourcePartition origin) { SourcePartition copy = new SourcePartition(); copy.setStartDate(origin.getStartDate()); copy.setEndDate(origin.getEndDate()); copy.setStartOffset(origin.getStartOffset()); copy.setEndOffset(origin.getEndOffset()); - copy.setSourcePartitionOffsetStart(origin.getSourcePartitionOffsetStart()); - copy.setSourcePartitionOffsetEnd(origin.getSourcePartitionOffsetEnd()); + if (origin.getSourcePartitionOffsetStart() != null) { + copy.setSourcePartitionOffsetStart(new HashMap<>(origin.getSourcePartitionOffsetStart())); + } + if (origin.getSourcePartitionOffsetEnd() != null) { + copy.setSourcePartitionOffsetEnd(new HashMap<>(origin.getSourcePartitionOffsetEnd())); + } return copy; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/516749e4/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java index 7a5d94f..b0c8e7f 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.IMRInput; @@ -34,12 +35,16 @@ import org.apache.kylin.source.ReadableTable; import org.apache.kylin.source.SourcePartition; import org.apache.kylin.source.kafka.config.KafkaConfig; import org.apache.kylin.source.kafka.util.KafkaClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; //used by reflection public class KafkaSource implements ISource { + private static final Logger logger = LoggerFactory.getLogger(KafkaSource.class); + @SuppressWarnings("unchecked") @Override public <I> I adaptToBuildEngine(Class<I> engineInterface) { @@ -71,32 +76,50 @@ public class KafkaSource implements ISource { if (result.getStartOffset() == 0) { final CubeSegment last = cube.getLastSegment(); if (last != null) { + logger.debug("Last segment exists, continue from last segment " + last.getName() + "'s end position: " + last.getSourcePartitionOffsetEnd()); // from last seg's end position result.setSourcePartitionOffsetStart(last.getSourcePartitionOffsetEnd()); } else if (cube.getDescriptor().getPartitionOffsetStart() != null && cube.getDescriptor().getPartitionOffsetStart().size() > 0) { + logger.debug("Last segment doesn't exist, use the start offset that be initiated previously: " + cube.getDescriptor().getPartitionOffsetStart()); result.setSourcePartitionOffsetStart(cube.getDescriptor().getPartitionOffsetStart()); } else { // from the topic's very begining; + logger.debug("Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's very beginning."); result.setSourcePartitionOffsetStart(KafkaClient.getEarliestOffsets(cube)); } } - final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(cube.getConfig()).getKafkaConfig(cube.getFactTable()); + final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getFactTable()); final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig); final String topic = kafakaConfig.getTopic(); try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) { final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); - if (partitionInfos.size() > result.getSourcePartitionOffsetStart().size()) { - // has new partition added - for (int x = result.getSourcePartitionOffsetStart().size(); x < partitionInfos.size(); x++) { - long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition()); - result.getSourcePartitionOffsetStart().put(partitionInfos.get(x).partition(), earliest); + for (PartitionInfo partitionInfo : partitionInfos) { + if (result.getSourcePartitionOffsetStart().containsKey(partitionInfo.partition()) == false) { + // has new partition added + logger.debug("has new partition added"); + long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfo.partition()); + logger.debug("new partition " + partitionInfo.partition() + " starts from " + earliest); + result.getSourcePartitionOffsetStart().put(partitionInfo.partition(), earliest); } } } if (result.getEndOffset() == Long.MAX_VALUE) { - result.setSourcePartitionOffsetEnd(KafkaClient.getCurrentOffsets(cube)); + logger.debug("Seek end offsets from topic"); + Map<Integer, Long> latestOffsets = KafkaClient.getCurrentOffsets(cube); + logger.debug("The end offsets are " + latestOffsets); + + for (Integer partitionId : latestOffsets.keySet()) { + if (result.getSourcePartitionOffsetStart().containsKey(partitionId)) { + if (result.getSourcePartitionOffsetStart().get(partitionId) > latestOffsets.get(partitionId)) { + throw new IllegalArgumentException("Partition " + partitionId + " end offset (" + latestOffsets.get(partitionId) + ") is smaller than start offset ( " + result.getSourcePartitionOffsetStart().get(partitionId) + ")"); + } + } else { + throw new IllegalStateException("New partition added in between, retry."); + } + } + result.setSourcePartitionOffsetEnd(latestOffsets); } long totalStartOffset = 0, totalEndOffset = 0; @@ -118,6 +141,7 @@ public class KafkaSource implements ISource { result.setStartOffset(totalStartOffset); result.setEndOffset(totalEndOffset); + logger.debug("parsePartitionBeforeBuild() return: " + result); return result; } http://git-wip-us.apache.org/repos/asf/kylin/blob/516749e4/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java index a0bbd22..446c076 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.source.kafka.config.BrokerConfig; @@ -117,7 +118,7 @@ public class KafkaClient { } public static Map<Integer, Long> getCurrentOffsets(final CubeInstance cubeInstance) { - final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(cubeInstance.getConfig()).getKafkaConfig(cubeInstance.getFactTable()); + final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getFactTable()); final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig); final String topic = kafakaConfig.getTopic(); @@ -135,7 +136,7 @@ public class KafkaClient { public static Map<Integer, Long> getEarliestOffsets(final CubeInstance cubeInstance) { - final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(cubeInstance.getConfig()).getKafkaConfig(cubeInstance.getFactTable()); + final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getFactTable()); final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig); final String topic = kafakaConfig.getTopic();