This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new e7ab5df KYLIN-4151 FileSplit ClassCastException in KafkaMRInput e7ab5df is described below commit e7ab5dfd4431a5b410813311fa5e5a983e9aa4f2 Author: shaofengshi <shaofeng...@apache.org> AuthorDate: Sat May 16 11:49:03 2020 +0800 KYLIN-4151 FileSplit ClassCastException in KafkaMRInput --- .../java/org/apache/kylin/source/kafka/KafkaMRInput.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java index 6f99e29..b546c5e 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -93,8 +93,16 @@ public class KafkaMRInput extends KafkaInputBase implements IMRInput { @Override public String getInputSplitSignature(InputSplit inputSplit) { - FileSplit baseSplit = (FileSplit) inputSplit; - return baseSplit.getPath().getName() + "_" + baseSplit.getStart() + "_" + baseSplit.getLength(); + FileSplit fs = null; + if (inputSplit instanceof FileSplit) { + fs = (FileSplit) inputSplit; + } else if (inputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) { + fs = new FileSplit((org.apache.hadoop.mapreduce.lib.input.FileSplit) inputSplit); + } else { + throw new IllegalArgumentException("Wrong input split: " + inputSplit); + } + + return fs.getPath().getName() + "_" + fs.getStart() + "_" + fs.getLength(); } }