KYLIN-1417 TimedJsonStreamParser should be case insensitive
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ebce31e0 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ebce31e0 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ebce31e0 Branch: refs/heads/1.4-rc Commit: ebce31e01cc4de1aed66a2fccf1e3998c45b7d06 Parents: 809fc62 Author: shaofengshi <shaofeng...@apache.org> Authored: Mon Feb 15 11:23:50 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Mon Feb 15 15:50:40 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/source/kafka/TimedJsonStreamParser.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/ebce31e0/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java index 0907623..65835cd 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java @@ -40,6 +40,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; import kafka.message.MessageAndOffset; @@ -102,7 +104,10 @@ public final class TimedJsonStreamParser extends StreamingParser { @Override public StreamingMessage parse(MessageAndOffset messageAndOffset) { try { - Map<String, String> root = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType); + Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType); + ConcurrentMap<String, String> root = new ConcurrentSkipListMap<String, String>(String.CASE_INSENSITIVE_ORDER); + root.putAll(message); + String tsStr = root.get(tsColName); //Preconditions.checkArgument(!StringUtils.isEmpty(tsStr), "Timestamp field " + tsColName + // //" cannot be null, the message offset is " + messageAndOffset.getOffset() + " content is " + new String(messageAndOffset.getRawData()));