KYLIN-1417 TimedJsonStreamParser should be case insensitive

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ebce31e0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ebce31e0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ebce31e0

Branch: refs/heads/1.4-rc
Commit: ebce31e01cc4de1aed66a2fccf1e3998c45b7d06
Parents: 809fc62
Author: shaofengshi <shaofeng...@apache.org>
Authored: Mon Feb 15 11:23:50 2016 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Mon Feb 15 15:50:40 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/source/kafka/TimedJsonStreamParser.java  | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/ebce31e0/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 0907623..65835cd 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -40,6 +40,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 
 import kafka.message.MessageAndOffset;
 
@@ -102,7 +104,10 @@ public final class TimedJsonStreamParser extends 
StreamingParser {
     @Override
     public StreamingMessage parse(MessageAndOffset messageAndOffset) {
         try {
-            Map<String, String> root = mapper.readValue(new 
ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+            Map<String, String> message = mapper.readValue(new 
ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+            ConcurrentMap<String, String> root = new 
ConcurrentSkipListMap<String, String>(String.CASE_INSENSITIVE_ORDER);
+            root.putAll(message);
+            
             String tsStr = root.get(tsColName);
             //Preconditions.checkArgument(!StringUtils.isEmpty(tsStr), 
"Timestamp field " + tsColName + //
             //" cannot be null, the message offset is " + 
messageAndOffset.getOffset() + " content is " + new 
String(messageAndOffset.getRawData()));

Reply via email to