This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new e7ab5df  KYLIN-4151 FileSplit ClassCastException in KafkaMRInput
e7ab5df is described below

commit e7ab5dfd4431a5b410813311fa5e5a983e9aa4f2
Author: shaofengshi <shaofeng...@apache.org>
AuthorDate: Sat May 16 11:49:03 2020 +0800

    KYLIN-4151 FileSplit ClassCastException in KafkaMRInput
---
 .../java/org/apache/kylin/source/kafka/KafkaMRInput.java     | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 6f99e29..b546c5e 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -93,8 +93,16 @@ public class KafkaMRInput extends KafkaInputBase implements 
IMRInput {
 
         @Override
         public String getInputSplitSignature(InputSplit inputSplit) {
-            FileSplit baseSplit = (FileSplit) inputSplit;
-            return baseSplit.getPath().getName() + "_" + baseSplit.getStart() 
+ "_" + baseSplit.getLength();
+            FileSplit fs = null;
+            if (inputSplit instanceof FileSplit) {
+                fs = (FileSplit) inputSplit;
+            } else if (inputSplit instanceof 
org.apache.hadoop.mapreduce.lib.input.FileSplit) {
+                fs = new 
FileSplit((org.apache.hadoop.mapreduce.lib.input.FileSplit) inputSplit);
+            } else {
+                throw new IllegalArgumentException("Wrong input split: " + 
inputSplit);
+            }
+
+            return fs.getPath().getName() + "_" + fs.getStart() + "_" + 
fs.getLength();
         }
     }
 

Reply via email to